Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark non-serializable exception when parsing JSON with json4s

I've run into an issue with attempting to parse json in my spark job. I'm using spark 1.1.0, json4s, and the Cassandra Spark Connector. The exception thrown is:

java.io.NotSerializableException: org.json4s.DefaultFormats

Examining the DefaultFormats companion object, and with this stack question, it is clear that DefaultFormats cannot be serialized. The question is now what to do.

I can see this ticket has apparently addressed this issue in the spark code base, by adding the keyword transient, yet I am not sure exactly how or where to apply it to my case. Is the solution to only instantiate the DefaultFormats class on the executors, to avoid serialization all together? Is there another JSON parsing library for scala/spark that people are using? I initially tried using jackson by itself, but ran into some errors with annotations that I could not resolve easily, and json4s worked out of the box. Here is my code:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

I do my json parsing in the checkUa function. I tried making count lazy, in hopes that it delay execution somehow, but it had no effect. Perhaps moving the implicit val inside checkUA? Any advice much appreciated.

like image 411
worker1138 Avatar asked Apr 15 '15 22:04

worker1138


2 Answers

This was already answered in an open ticket with json4s. The workaround is to put the implicit declaration inside of the function

val count = rdd
               .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
               .reduce((x, y) => x + y) 
like image 168
Justin Pihony Avatar answered Oct 14 '22 13:10

Justin Pihony


I had the same error when I put the implicit val formats = ... declaration inside the method which contains the parsing, instead of declaring it on the class (object).

So this would throw an error:

object Application {

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    implicit val formats = DefaultFormats
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}

But this would be fine:

object Application {

  implicit val formats = DefaultFormats

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}
like image 31
Daniel Zolnai Avatar answered Oct 14 '22 14:10

Daniel Zolnai