I'm trying to write some simple data in HBase (0.96.0-hadoop2) using Spark 1.0 but I keep getting getting serialization problems. Here is the relevant code:
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put
object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}
/** Load properties here **/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))
val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}
Running the code results in:
Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
Replacing the foreach with map doesn't crash but I doesn't write either. Any help will be greatly appreciated.
The class HBaseConfiguration
represents a pool of connections to HBase servers. Obviously, it can't be serialized and sent to the worker nodes. Since HTable
uses this pool to communicate with the HBase servers, it can't be serialized too.
Basically, there are three ways to handle this problem:
Note the use of foreachPartition
method:
val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
val hbaseConf = HBaseConfiguration.create()
<... configure HBase ...>
val myTable = new HTable(hbaseConf, tableName)
iter.foreach { a =>
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
}
}
Note that each of worker nodes must have access to HBase servers and must have required jars preinstalled or provided via ADD_JARS
.
Also note that since the connection pool if opened for each of partitions, it would be a good idea to reduce the number of partitions roughly to the number of worker nodes (with coalesce
function). It's also possible to share a single HTable
instance on each of worker nodes, but it's not so trivial.
It's possible to write all data from an RDD with a single computer, even if it the data doesn't fit to memory. The details are explained in this answer: Spark: Best practice for retrieving big data from RDD to local machine
Of course, it would be slower than distributed writing, but it's simple, doesn't bring painful serialization issues and might be the best approach if the data size is reasonable.
It's possible to create a custom HadoopOutputFormat for HBase or use an existing one. I'm not sure if there exists something that fits your needs, but Google should help here.
P.S. By the way, the map
call doesn't crash since it doesn't get evaluated: RDDs aren't evaluated until you invoke a function with side-effects. For example, if you called theData.map(....).persist
, it would crash too.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With