Basically, i have to analyze some complex JSON's on HDFS with Spark.
I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class
This one works fine!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
So far so good!
When i try to extract the (pre)filtered JSON to my CaseClass i get this:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$
here the code with extraction:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.
Spark serializes the closures on the RDD transformations and 'ships' those to the workers for distributed execution. That mandates that all code within the closure (and often also in the containing object) should be serializable.
Looking that the impl of org.json4s.DefaultFormat$ (the companion object of that trait):
object DefaultFormats extends DefaultFormats {
val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
val UTC = TimeZone.getTimeZone("UTC")
}
It's clear that this object is not serializable and cannot be made so. (ThreadLocal is by its own nature non-serializable)
You don't seem to be using Date
types on your code, so could you get rid of
implicit val formats = DefaultFormats
or replace DefaultFormats by something serializable?
This has actually now been fixed; JSON4S is serializable as of version 3.3.0: https://github.com/json4s/json4s/issues/137
What solved my issue was, I used implicit val formats = DefaultFormats
in rdd.foreach{}
loop. It resolved my serializable Exception.
Here's my code snippet which solved the issue:
case class rfId(rfId: String) {}
// ... some code here ...
rdd.foreach { record =>
val value = record.value()
// Bring in default date formats etc and makes json4s serializable
implicit val formats = DefaultFormats
val json = parse(value)
println(json.camelizeKeys.extract[rfId]) // Prints `rfId(ABC12345678)`
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With