The following is my PySpark startup snippet, which is pretty reliable (I've been using it a long time). Today I added the two Maven Coordinates shown in the spark.jars.packages
option (effectively "plugging" in Kafka support). Now that normally triggers dependency downloads (performed by Spark automatically):
import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
# ------------------------------------------
# Note: Row() in .../pyspark/sql/types.py
# isn't included in '__all__' list(), so
# we must import it by name here.
# ------------------------------------------
num_cpus = multiprocessing.cpu_count() # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None) # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None) # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME', None) # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP', None) # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre' # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'
# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
('spark.app.name', 'myApp'),
('spark.submit.deployMode', 'client'),
('spark.ui.showConsoleProgress', 'true'),
('spark.eventLog.enabled', 'false'),
('spark.logConf', 'false'),
('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),
('spark.jars.ivy', JARS_IVY_REPO),
('spark.jars.packages', spark_jars_packages), ])
spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt = spark_sesn.sparkContext
spark_reader = spark_sesn.read
spark_streamReader = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")
However the plugins aren't downloading and/or loading when I run the snippet (e.g. ./python -i init_spark.py
), as they should.
This mechanism used to work, but then stopped. What am I missing?
Thank you in advance!
This is the kind of post where the QUESTION will be worth more than the ANSWER, because the code above works but isn't anywhere to be found in Spark 2.x documentation or examples.
The above is how I've programmatically added functionality to Spark 2.x by way of Maven Coordinates. I had this working but then it stopped working. Why?
When I ran the above code in a jupyter notebook
, the notebook had -- behind the scenes -- already run that identical code snippet by way of my PYTHONSTARTUP
script. That PYTHONSTARTUP
script has the same code as the above, but omits the maven coordinates (by intent).
Here, then, is how this subtle problem emerges:
spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()
Because a Spark Session already existed, the above statement simply reused that existing session (.getOrCreate()), which did not have the jars/libraries loaded (again, because my PYTHONSTARTUP script intentionally omits them). This is why it is a good idea to put print statements in PYTHONSTARTUP scripts (which are otherwise silent).
In the end, I simply forgot to do this: $ unset PYTHONSTARTUP
before starting the JupyterLab / Notebook
daemon.
I hope the Question helps others because that's how to programmatically add functionality to Spark 2.x (in this case Kafka). Note that you'll need an internet connection for the one-time download of the specified jars and recursive dependencies from Maven Central.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With