Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bundling Python3 packages for PySpark results in missing imports

I'm trying to run a PySpark job that depends on certain python3 libraries. I know I can install these libraries on the Spark Cluster, but since I'm reusing the cluster for multiple jobs, I'd like to rather bundle all dependencies and pass them to each job via the --py-files directive.

To do this I use:

pip3 install -r requirements.txt --target ./build/dependencies
cd ./build/dependencies
zip -qrm . ../dependencies.zip

Which effectively zips all code from the required packages to be used at root level.

enter image description here

In my main.py I can import the dependencies

if os.path.exists('dependencies.zip'):
    sys.path.insert(0, 'dependencies.zip')

And also add the .zip to my Spark Context

sc.addPyFile('dependencies.zip')

So far so good.

But for some reason this will devolve in some kind of dependency hell on the Spark Cluster

Eg running

spark-submit --py-files dependencies.zip main.py

Where in main.py (or class) I want to use a panda. The code it will trigger this error:

Traceback (most recent call last):

File "/Users/tomlous/Development/Python/enrichers/build/main.py", line 53, in job_module = importlib.import_module('spark.jobs.%s' % args.job_name) ...

File "", line 978, in _gcd_import

File "", line 961, in _find_and_load

File "", line 950, in _find_and_load_unlocked

File "", line 646, in _load_unlocked

File "", line 616, in _load_backward_compatible

File "dependencies.zip/spark/jobs/classify_existence.py", line 9, in

File "dependencies.zip/enrich/existence.py", line 3, in

File "dependencies.zip/pandas/init.py", line 19, in

ImportError: Missing required dependencies ['numpy']

Looking at panda's __init__.py I see something like __import__(numpy)

So I assume numpy is not loaded.

But if I change my code to explicitly call numpy functions, it actually finds numpy, but not some of it's dependecies

import numpy as np
a = np.array([1, 2, 3])

The code returns

Traceback (most recent call last):

File "dependencies.zip/numpy/core/init.py", line 16, in

ImportError: cannot import name 'multiarray'

So my question is:

How should I bundle python3 libraries with my spark job in a way that I don't have to pip3 install all possible libraries on a Spark cluster?

like image 234
Tom Lous Avatar asked Feb 13 '18 15:02

Tom Lous


People also ask

Can we use Python libraries in PySpark?

Many data scientists prefer Python to Scala for data science, but it is not straightforward to use a Python library on a PySpark cluster without modification. To solve this problem, data scientists are typically required to use the Anaconda parcel or a shared NFS mount to distribute dependencies.


1 Answers

Update: There's a cohesive repo that includes a sample project that does this quite wonderfully. You should take a look, especially if my example below doesn't work for you. The repo is here: https://github.com/massmutual/sample-pyspark-application and includes this example for running on YARN: https://github.com/massmutual/sample-pyspark-application/blob/master/setup-and-submit.sh that expects you to first export several environment variables. (The values I provided are specific to EMR, so your values might be different.)

export HADOOP_CONF_DIR="/etc/hadoop/conf"
export PYTHON="/usr/bin/python3"
export SPARK_HOME="/usr/lib/spark"
export PATH="$SPARK_HOME/bin:$PATH"

As mentioned here: I can't seem to get --py-files on Spark to work it is necessary to use something like virtualenv (or perhaps conda might work) to avoid experiencing problems associated with the compilation of C libraries for Python packages (such as Numpy) that depend upon the underlying hardware architecture in a manner that fails to successfully port to other machines in the cluster due to hard links in the dependencies and/or task nodes that may have different hardware from the master node instance.

Some of the differences between --archives and --py-files are discussed here: Shipping and using virtualenv in a pyspark job

I suggest using --archives with virtualenv for providing the zipped file that contains package dependencies to avoid some of the problems I mentioned above.

For example, from an Amazon Elastic Map Reduce (EMR) cluster, while ssh'd into the master instance, I was able to successfully use spark-submit to execute a test python script from a virtualenv environment like this:

pip-3.4 freeze | egrep -v sagemaker > requirements.txt
# Above line is just in case you want to port installed packages across environments.
virtualenv -p python3 spark_env3
virtualenv -p python3 --relocatable spark_env3
source spark_env3/bin/activate
sudo pip-3.4 install -U pandas boto3 findspark jaydebeapi
# Note that the above libraries weren't required for the test script, but I'm showing how you can add additional dependencies if needed.
sudo pip-3.4 install -r requirements.txt
# The above line is just to show how you can load from a requirements file if needed.
cd spark_env3
# We must cd into the directory before we zip it for Spark to find the resources. 
zip -r ../spark_env3_inside.zip *
# Be sure to cd back out after building the zip file. 
cd ..

PYSPARK_PYTHON=./spark_env3/bin/python3 spark-submit \ 
  --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./spark_env3/bin/python3 \
  --master yarn-cluster \
  --archives /home/hadoop/spark_env3_inside.zip#spark_env3 \
  test_spark.py

Note that the hashtag near the end of the last line above is not a comment. It is a directive to spark-submit, as explained here: Upload zip file using --archives option of spark-submit on yarn

The source of the test script that I'm running is from this article that talks about using conda instead of virtualenv for running pyspark jobs: http://quasiben.github.io/blog/2016/4/15/conda-spark/

and contains this code for the test_spark.py script:

# test_spark.py
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf()
conf.setAppName("get-hosts")

sc = SparkContext(conf=conf)

def noop(x):
    import socket
    import sys
    return socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)

rdd = sc.parallelize(range(1000), 100)
hosts = rdd.map(noop).distinct().collect()
print(hosts)

If you want some background information about using virtualenv to execute a pyspark job, as @Mariusz mentioned already, there is a useful example in this blog post: https://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/ (though it doesn't explain some of the subtleties that I clarified with the other links that I provided).

There is also an additional example in the answer post provided here: Elephas not loaded in PySpark: No module named elephas.spark_model

There's yet another example available here: https://community.hortonworks.com/articles/104947/using-virtualenv-with-pyspark.html

like image 136
devinbost Avatar answered Oct 24 '22 23:10

devinbost