I am working on Kafka streaming and trying to integrate it with Apache Spark. However, while running I am getting into issues. I am getting the below error.
This is the command I am using.
df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()
ERROR:
Py4JJavaError: An error occurred while calling o77.load.: java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
How can I resolve this?
NOTE: I am running this in Jupyter Notebook
findspark.init('/home/karan/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
Everything is running fine till here (above code)
df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()
This is where things are going wrong (above code).
The blog which I am following: https://www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/
Kafka Data Source provides a streaming source and a streaming sink for micro-batch and continuous stream processing. Kafka Data Source is part of the spark-sql-kafka-0-10 external module that is distributed with the official distribution of Apache Spark, but it is not included in the CLASSPATH by default.
Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. PySpark is an interface for Apache Spark in Python.
Spark supports the following ways to authenticate against Kafka cluster: Delegation token (introduced in Kafka broker 1.1.0) This way the application can be configured via Spark parameters and may not need JAAS login configuration (Spark can use Kafka’s dynamic JAAS configuration feature).
Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option ("kafka.bootstrap.servers", "host:port"). For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.
Edit
Using spark.jars.packages
works better than PYSPARK_SUBMIT_ARGS
Ref - PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition
It's not clear how you ran the code. Keep reading the blog, and you see
spark-submit \
...
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
sstreaming-spark-out.py
Seems you missed adding the --packages
flag
In Jupyter, you could add this
import os
# setup arguments
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0'
# initialize spark
import pyspark, findspark
findspark.init()
Note: _2.11:2.4.0
need to align with your Scala and Spark versions... Based on the question, yours should be Spark 2.1.0
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With