Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

structured streaming Kafka 2.1->Zeppelin 0.8->Spark 2.4: spark does not use jar

I have a Kafka 2.1 message broker and want to do some processing with data of the messages within Spark 2.4. I want to use Zeppelin 0.8.1 notebooks for rapid prototyping.

I downloaded the spark-streaming-kafka-0-10_2.11.jar that is necessarry for structured streaming (http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) and added it as "Dependencies-artifact" to the "spark"-interpreter of Zeppelin (that also deals with the %pyspark paragraphs). I restarted this interpreter (and also zeppelin).

I also loaded the jar in a first notebook paragraph (I first thought that this should not be necessary...):

%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

So, I got no error so the loading seems to work. Now, I want to do the testing, the kafka server runs on the same machine using this port and there is also a topic "test":

%pyspark
# Subscribe to a topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

But I get the error

Fail to execute line 6: .option("subscribe", "test") \ Traceback (most recent call last): File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o120.load. : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-312826888257172599.py", line 380, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load return self._df(self._jreader.load()) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: 'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

I wondered since at least one of the adjustments (interpreter configuration or direct loading) should have worked.

I also tried spark-submit --jar /usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar on the console but this seems to work only if I submit a program.

So, I also copied spark-streaming-kafka-0-10_2.11.jar to /usr/local/analyse/spark/jars/ where all the others jars of spark are. But after a restart (of spark and zeppelin) I always get the same error.

In the meantime I found out that I can view the environment variables of spark in the webbrowser and there I find the spark-streaming-kafka-0-10_2.11.jar in the section "Classpath Entries" with the source "System Classpath" and also as "Added By User" (seems to be the artifact in the interpreter section of Zeppelin). So it seems that my first two attemps should have worked.

like image 484
tardis Avatar asked Oct 16 '22 12:10

tardis


1 Answers

The first issue is that you have downloaded the package for spark streaming but try to create a structered streaming object (with readstream()). Keep in mind that spark streaming and spark structured streaming are two different things and require to be treated differently.

For structured streaming you need to download the package spark-sql-kafka-0-10_2.11 and its dependencies kafka-clients, slf4j-api, snappy-java, lz4-java and unused. Your dependency section should look like this to load all the required packages:

z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
z.load("/tmp/kafka-clients-2.0.0.jar")
z.load("/tmp/lz4-java-1.4.0.jar")
z.load("/tmp/snappy-java-1.1.7.1.jar")
z.load("/tmp/unused-1.0.0.jar")
z.load("/tmp/slf4j-api-1.7.16.jar")
like image 102
cronoik Avatar answered Oct 20 '22 05:10

cronoik