Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark Failed to find data source: kafka

I am working on Kafka streaming and trying to integrate it with Apache Spark. However, while running I am getting into issues. I am getting the below error.

This is the command I am using.

df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()

ERROR:

Py4JJavaError: An error occurred while calling o77.load.: java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html

How can I resolve this?

NOTE: I am running this in Jupyter Notebook

findspark.init('/home/karan/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

Everything is running fine till here (above code)

df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()

This is where things are going wrong (above code).

The blog which I am following: https://www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/

like image 494
P Kernel Avatar asked Nov 06 '19 04:11

P Kernel


People also ask

What is Kafka data source in spark?

Kafka Data Source provides a streaming source and a streaming sink for micro-batch and continuous stream processing. Kafka Data Source is part of the spark-sql-kafka-0-10 external module that is distributed with the official distribution of Apache Spark, but it is not included in the CLASSPATH by default.

What is Apache Kafka in Python?

Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. PySpark is an interface for Apache Spark in Python.

How to authenticate against Kafka cluster in Apache Spark?

Spark supports the following ways to authenticate against Kafka cluster: Delegation token (introduced in Kafka broker 1.1.0) This way the application can be configured via Spark parameters and may not need JAAS login configuration (Spark can use Kafka’s dynamic JAAS configuration feature).

How to set Kafka's own configurations?

Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option ("kafka.bootstrap.servers", "host:port"). For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.


1 Answers

Edit

Using spark.jars.packages works better than PYSPARK_SUBMIT_ARGS

Ref - PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition


It's not clear how you ran the code. Keep reading the blog, and you see

spark-submit \
  ...
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
  sstreaming-spark-out.py

Seems you missed adding the --packages flag

In Jupyter, you could add this

import os

# setup arguments
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0'

# initialize spark
import pyspark, findspark
findspark.init()

Note: _2.11:2.4.0 need to align with your Scala and Spark versions... Based on the question, yours should be Spark 2.1.0

like image 183
OneCricketeer Avatar answered Oct 09 '22 21:10

OneCricketeer