Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming + json4s-jackson dependency problems

I am unable to use json4s-Jackson 3.2.11 within my spark 1.4.1 Streaming application.

Thinking that it was the existing dependency within the spark-core project that is causing the problem as explained here -> Is it possible to use json4s 3.2.11 with Spark 1.3.0? I have built Spark from source with an adjusted core/pom.xml. I have changed the reference from json4s-jackson_2.10:3.2.10 to 3.2.11, as the 2.10 version does not support extracting to implicit types.

I have replaced the source jars that are referenced in my intellij IDEA project with the rebuilt jars, however I am still getting the same errors as before. I fear that Spark must still be referencing json4s 3.2.10 somehow?

here is my simple test:

object StreamingPredictor {

  implicit val formats = DefaultFormats

  case class event(Key: String,
                   sensorId: String,
                   sessionId: String,
                   deviceId: String,
                   playerId: String,
                   impressionId: String,
                   time: String,
                   eventName: String,
                   eventProperties: Map[String, Any],
                   dl: Array[List[(String, Any)]],
                   $post: Boolean,
                   $sync: Boolean)

  def parser(json: String): String = {
    val parsedJson = parse(json)
    val foo = parsedJson.extract[event]
    foo.eventName
  }

  def main(args: Array[String]) {

    val zkQuorum = "localhost:2181"
    val group = "myGroup"
    val topic = Map("test" -> 1)
    val sparkContext = new SparkContext("local[4]","KafkaConsumer")
    val ssc = new StreamingContext(sparkContext, Seconds(1))

    val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
    val eventName = json.map(_._2).map(parser)

    eventName.print()

    ssc.start()

  }
}

The error I get when referencing json4s 3.2.11 in my application pom.xml file:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.render(Lorg/json4s/JsonAST$JValue;)Lorg/json4s/JsonAST$JValue;
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:174)
        at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

And the error I get when i use json4s-jackson_2.10:3.2.10 in my application pom.xml file:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.json4s.package$MappingException: No usable value for eventProperties
No information known about type
        at org.json4s.reflect.package$.fail(package.scala:96)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:443)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:451)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$.extract(Extraction.scala:42)
        at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
        at com.pca.triggar.Streaming.StreamingPredictor$.parser(StreamingPredictor.scala:38)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.json4s.package$MappingException: No information known about type
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:465)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$.extract(Extraction.scala:316)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:431)
        ... 42 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
like image 299
Gillespie Avatar asked Sep 04 '15 14:09

Gillespie


1 Answers

Ok, I found the problem. As posted else where, you need to compile against jason4s 3.2.10. Apparently, doing so generates a binary which would then work with Spark (version 1.5 in my case. Same in some earlier versions as well). It has to do with the default parameter in the render() method which shows up in 3.2.11.

like image 79
bhomass Avatar answered Oct 03 '22 14:10

bhomass