Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Direct Kafka Stream with PySpark (Apache Spark 1.6)

I'm trying to leverage the direct kafka consumer (new feature available in python), to capture data from a custom Kafka Producer that I'm running on localhost:9092.

I'm currently using the "direct_kafka_wordcount.py" as provided by the spark 1.6 example scripts.

Source: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py

DOCS: http://spark.apache.org/docs/latest/streaming-kafka-integration.html

I'm using the following command the run the program:

    ~/spark-1.6.0/bin/spark-submit --jars 
    ~/spark-1.6.0/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.6.0.jar           
direct_kafka_wordcount.py localhost:9092 twitter.live

Unfortunately, I'm getting a strange error, which I'm not able to debug. Any tips/suggestions will be immensely appreciated.

py4j.protocol.Py4JJavaError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
like image 621
cynical biscuit Avatar asked Feb 27 '16 22:02

cynical biscuit


People also ask

How do you integrate Kafka with Pyspark?

Kafka Consumer We first create a spark session, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. To read from Kafka for streaming queries, we can use the function spark. readStream.

How does Spark streaming work with Kafka?

Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. A direct stream can also be created for an input stream to directly pull messages from Kafka.

Can Spark be used with Kafka?

Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming.

What is the primary difference between Kafka streams and Spark streaming?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.


Video Answer


2 Answers

The error:

java.nio.channels.ClosedChannelException

means the topic does not exist, or the brokers are not reachable or there is some network(proxy) kind of issue.

Make sure there is no such connectivity issue by running kafka-console-consumer on the spark master & worker nodes.

like image 187
Mohitt Avatar answered Nov 11 '22 18:11

Mohitt


I had similar problem. But turn out to be different solution. I had different versions of scala running for spark and kafka.

I ended up using same version on both side then pyspark was able to generate the classes.

I used following

Spark: spark-1.6.3-bin-hadoop2.6.tgz spark-streaming-kafka: spark-streaming-kafka-assembly_2.10-1.6.3.jar

like image 33
chandank Avatar answered Nov 11 '22 19:11

chandank