I've run into an issue with attempting to parse json in my spark job. I'm using spark 1.1.0
, json4s
, and the Cassandra Spark Connector
. The exception thrown is:
java.io.NotSerializableException: org.json4s.DefaultFormats
Examining the DefaultFormats companion object, and with this stack question, it is clear that DefaultFormats cannot be serialized. The question is now what to do.
I can see this ticket has apparently addressed this issue in the spark code base, by adding the keyword transient, yet I am not sure exactly how or where to apply it to my case. Is the solution to only instantiate the DefaultFormats class on the executors, to avoid serialization all together? Is there another JSON parsing library for scala/spark that people are using? I initially tried using jackson by itself, but ran into some errors with annotations that I could not resolve easily, and json4s worked out of the box. Here is my code:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y)
I do my json parsing in the checkUa function. I tried making count lazy, in hopes that it delay execution somehow, but it had no effect. Perhaps moving the implicit val inside checkUA? Any advice much appreciated.
This was already answered in an open ticket with json4s. The workaround is to put the implicit
declaration inside of the function
val count = rdd
.map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
.reduce((x, y) => x + y)
I had the same error when I put the implicit val formats = ...
declaration inside the method which contains the parsing, instead of declaring it on the class (object).
So this would throw an error:
object Application {
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
implicit val formats = DefaultFormats
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
But this would be fine:
object Application {
implicit val formats = DefaultFormats
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With