Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Incompatible Jackson version: Spark Structured Streaming

I am trying to run a very simple Spark Streaming word count program which reads from a Kafka topic. Below is my code:

val spark = SparkSession
  .builder()
  .appName("KafkaWordCount")
  .config("spark.master", "local")
  .getOrCreate()

import spark.implicits._

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val lines = df.selectExpr("CAST(value AS STRING)").as[String]

val words = lines.flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

When I run this program, I get the following exception:

Exception in thread "stream execution thread for [id = f704d6e5-14bf-4bd7-94a0-38c4b77986ea, runId = d277eaac-e18c-4128-954b-6a318bb8039c]" Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.map(RDD.scala:370)
    at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:394)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.4
    at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
    at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
    at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
    at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
    at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
    ... 28 more
org.apache.spark.sql.streaming.StreamingQueryException: null

Other Stack Overflow answers suggest including different versions of jackson in the pom. So, first it is not a maven but an sbt project. Below is my build.sbt

name := "spark"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.3.1",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1",
  "org.apache.kafka" %% "kafka" % "1.1.0"
)

What do I need to do to remove this error and make this program work?

like image 206
Piyush Shrivastava Avatar asked Jul 25 '18 09:07

Piyush Shrivastava


1 Answers

Apache Spark uses jackson 2.6.7 version:

<fasterxml.jackson.version>2.6.7</fasterxml.jackson.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>

see Spark project pom.xml

Whereas Kafka uses Jackson jackson: "2.9.6" version.

versions += [
  activation: "1.1.1",
  apacheda: "1.0.0",
  apacheds: "2.0.0-M24",
  argparse4j: "0.7.0",
  bcpkix: "1.59",
  easymock: "3.6",
  jackson: "2.9.6",
  jetty: "9.2.24.v20180105",

Kafka Gradle

To correct this override the conflicted jar version as follows:

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7"

Please refer this link to get more details:

https://www.scala-sbt.org/1.x/docs/Library-Management.html#Overriding+a+version

like image 186
Naga Avatar answered Oct 01 '22 05:10

Naga