I'm trying to leverage the direct kafka consumer (new feature available in python), to capture data from a custom Kafka Producer that I'm running on localhost:9092.
I'm currently using the "direct_kafka_wordcount.py" as provided by the spark 1.6 example scripts.
Source: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py
DOCS: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
I'm using the following command the run the program:
~/spark-1.6.0/bin/spark-submit --jars
~/spark-1.6.0/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.6.0.jar
direct_kafka_wordcount.py localhost:9092 twitter.live
Unfortunately, I'm getting a strange error, which I'm not able to debug. Any tips/suggestions will be immensely appreciated.
py4j.protocol.Py4JJavaError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Kafka Consumer We first create a spark session, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. To read from Kafka for streaming queries, we can use the function spark. readStream.
Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. A direct stream can also be created for an input stream to directly pull messages from Kafka.
Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
The error:
java.nio.channels.ClosedChannelException
means the topic
does not exist, or the brokers are not reachable or there is some network(proxy) kind of issue.
Make sure there is no such connectivity issue by running kafka-console-consumer
on the spark master & worker nodes.
I had similar problem. But turn out to be different solution. I had different versions of scala running for spark and kafka.
I ended up using same version on both side then pyspark was able to generate the classes.
I used following
Spark: spark-1.6.3-bin-hadoop2.6.tgz
spark-streaming-kafka: spark-streaming-kafka-assembly_2.10-1.6.3.jar
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With