Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka with Spark 2.1 Structured Streaming - cannot deserialize

With Apache Spark version 2.1, I would like to use Kafka (0.10.0.2.5) as source for Structured Streaming with pyspark.

In the Kafka topic, I have json messages (pushed with Streamsets Data Collector). However, I am not able to read it with following code:

kafka=spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers","localhost:6667") \
.option("subscribe","mytopic").load()
msg=kafka.selectExpr("CAST(value AS STRING)")
disp=msg.writeStream.outputMode("append").format("console").start()

It generates this error :

 java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

I tried to add at the readStream line:

.option("value.serializer","org.common.serialization.StringSerializer")
.option("key.serializer","org.common.serialization.StringSerializer")

But it does not solve the problem.

Any idea ? Thank you in advance.

like image 959
JS G. Avatar asked Mar 30 '17 06:03

JS G.


People also ask

What is the difference between spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How to send data from Spark to Kafka?

The spark-kafka integration depends on the spark, spark streaming and spark Kafka integration jar. Create a new file build. sbt and specify the application details and its dependency. The sbt will download the necessary jar while compiling and packing the application.

How Spark reads data from Kafka?

To read from Kafka for streaming queries, we can use function SparkSession. readStream. Kafka server addresses and topic names are required. Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above.


1 Answers

Actually I found the solution: I added the following jar in dependency:

spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar

(after having downloaded it from https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10-assembly_2.10/2.1.0)

like image 55
JS G. Avatar answered Sep 18 '22 03:09

JS G.