Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding custom jars to pyspark in jupyter notebook

I am using the Jupyter notebook with Pyspark with the following docker image: Jupyter all-spark-notebook

Now I would like to write a pyspark streaming application which consumes messages from Kafka. In the Spark-Kafka Integration guide they describe how to deploy such an application using spark-submit (it requires linking an external jar - explanation is in 3. Deploying). But since I am using Jupyter notebook I never actually run the spark-submit command, I assume it gets run in the back if I press execute.

In the spark-submit command you can specify some parameters, one of them is -jars, but it is not clear to me how I can set this parameter from the notebook (or externally via environment variables?). I am assuming I can link this external jar dynamically via the SparkConf or the SparkContext object. Has anyone experience on how to perform the linking properly from the notebook?

like image 530
DDW Avatar asked Mar 11 '16 17:03

DDW


People also ask

How do I add jars to PySpark?

Use --jars option To add JARs to a Spark job, --jars option can be used to include JARs on Spark driver and executor classpaths. If multiple JAR files need to be included, use comma to separate them. The following is an example: spark-submit --jars /path/to/jar/file1,/path/to/jar/file2 ...


4 Answers

I've managed to get it working from within the jupyter notebook which is running form the all-spark container.

I start a python3 notebook in jupyterhub and overwrite the PYSPARK_SUBMIT_ARGS flag as shown below. The Kafka consumer library was downloaded from the maven repository and put in my home directory /home/jovyan:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 
  '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'

import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)

broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"],
                        {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

Note: Don't forget the pyspark-shell in the environment variables!

Extension: If you want to include code from spark-packages you can use the --packages flag instead. An example on how to do this in the all-spark-notebook can be found here

like image 123
DDW Avatar answered Sep 21 '22 05:09

DDW


Indeed, there is a way to link it dynamically via the SparkConf object when you create the SparkSession, as explained in this answer:

spark = SparkSession \
    .builder \
    .appName("My App") \
    .config("spark.jars", "/path/to/jar.jar,/path/to/another/jar.jar") \
    .getOrCreate()
like image 43
Nandan Rao Avatar answered Sep 21 '22 05:09

Nandan Rao


You can run your jupyter notebook with the pyspark command by setting the relevant environment variables:

export PYSPARK_DRIVER_PYTHON=jupyter
export IPYTHON=1
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXX --ip=YYY"

with XXX being the port you want to use to access the notebook and YYY being the ip address.

Now simply run pyspark and add --jars as a switch the same as you would spark submit

like image 28
Assaf Mendelson Avatar answered Sep 21 '22 05:09

Assaf Mendelson


In case someone is the same as me: I tried all above solutions and none of them works for me. What I'm trying to do is to use Delta Lake in the Jupyter notebook.

Finally I can use from delta.tables import * by calling SparkContext.addPyFile("/path/to/your/jar.jar") first. Though in the spark official docs, it only mentions adding .zip or .py file, but I tried .jar and it worked perfectly.

like image 33
Dd__Mad Avatar answered Sep 22 '22 05:09

Dd__Mad