I am trying to integrate Apache Kafka with Apache spark streaming using Python (I am new to all these).
For this I have done the following steps
bin/kafka-topics.sh --list --zookeeper localhost:2181
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py
and the code is
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
./spark-submit /root/girish/python/kafkawordcount.py localhost:2181
and I got this error
Traceback (most recent call last):
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 72, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
spark submit failed with spark streaming workdcount python code
to
./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
Now I am getting this error
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'
Please help to solve this issue.
Thanks in advance
PS: I am using Apache Spark 1.2
Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming.
The Kafka-Python client is a Python client for Kafka which helps data scientists process and send streams to Kafka. With the Kafka-Python client, data engineers can now process data streams and send them to Kafka for consumption or storage, improving data integration.
Stream Processing Kafka can be used along with real time streaming applications like spark and storm.
Faced the same issue, fixed by adding the kafka-assembly package
bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 ~/py/sparkjob.py
Use according to your spark and kafka versions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With