Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NotSerializableException with json4s on Spark

Basically, i have to analyze some complex JSON's on HDFS with Spark.

I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class

This one works fine!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

So far so good!

When i try to extract the (pre)filtered JSON to my CaseClass i get this:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$

here the code with extraction:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.

like image 708
λ Allquantor λ Avatar asked Jul 16 '14 16:07

λ Allquantor λ


3 Answers

Spark serializes the closures on the RDD transformations and 'ships' those to the workers for distributed execution. That mandates that all code within the closure (and often also in the containing object) should be serializable.

Looking that the impl of org.json4s.DefaultFormat$ (the companion object of that trait):

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}

It's clear that this object is not serializable and cannot be made so. (ThreadLocal is by its own nature non-serializable)

You don't seem to be using Date types on your code, so could you get rid of implicit val formats = DefaultFormats or replace DefaultFormats by something serializable?

like image 80
maasg Avatar answered Nov 18 '22 04:11

maasg


This has actually now been fixed; JSON4S is serializable as of version 3.3.0: https://github.com/json4s/json4s/issues/137

like image 41
stefano Avatar answered Nov 18 '22 05:11

stefano


What solved my issue was, I used implicit val formats = DefaultFormats in rdd.foreach{} loop. It resolved my serializable Exception.

Here's my code snippet which solved the issue:

case class rfId(rfId: String) {}

// ... some code here ...

 rdd.foreach { record =>
    val value = record.value()

    // Bring in default date formats etc and makes json4s serializable
    implicit val formats = DefaultFormats
    val json = parse(value)
    println(json.camelizeKeys.extract[rfId])  // Prints `rfId(ABC12345678)`
 }
like image 1
Abhay Bh Avatar answered Nov 18 '22 04:11

Abhay Bh