Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Enriching SparkContext without incurring in serialization issues

I am trying to use Spark to process data that comes from HBase tables. This blog post gives an example of how to use NewHadoopAPI to read data from any Hadoop InputFormat.

What I have done

Since I will need to do this many times, I was trying to use implicits to enrich SparkContext, so that I can get an RDD from a given set of columns in HBase. I have written the following helper:

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
    data map { case (cf, columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed", true)
    conf.setInt("hbase.client.scanner.caching", 10000)
    conf.set(TableInputFormat.INPUT_TABLE, table)

    conf
  }

  def hbase[A](table: String, data: Map[String, List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
        Bytes.toString(key.get) -> extract(data, row, interpret)
      }

}

It can be used like

val rdd = sc.hbase[String](table, Map(
  "cf" -> List("col1", "col2")
))

In this case we get an RDD of (String, Map[String, Map[String, String]]), where the first component is the rowkey and the second is a map whose key are column families and the values are maps whose keys are columns and whose content are the cell values.

Where it fails

Unfortunately, it seems that my job gets a reference to sc, which is itself not serializable by design. What I get when I run the job is

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

I can remove the helper classes and use the same logic inline in my job and everything runs fine. But I want to get something which I can reuse instead of writing the same boilerplate over and over.

By the way, the issue is not specific to implicit, even using a function of sc exhibits the same problem.

For comparison, the following helper to read TSV files (I know it's broken as it does not support quoting and so on, never mind) seems to work fine:

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields, contents).zipped.toMap
  }
}

How can I encapsulate the logic to read rows from HBase without unintentionally capturing the SparkContext?

like image 638
Andrea Avatar asked May 19 '14 12:05

Andrea


1 Answers

Just add @transient annotation to sc variable:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

and make sure sc is not used within extract function, since it won't be available on workers.

If it's necessary to access Spark context from within distributed computation, rdd.context function might be used:

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k, v) => 
    val ctx = rdd.context
    ....
}
like image 147
Wildfire Avatar answered Oct 05 '22 09:10

Wildfire