Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark addPyFile to add zip of .py files, but module still not found

Using addPyFiles() seems to not be adding desiered files to spark job nodes (new to spark so may be missing some basic usage knowledge here).

Attempting to run a script using pyspark and was seeing errors that certain modules are not found for import. Never used spark before, but other posts (from package in question https://github.com/cerndb/dist-keras/issues/36#issuecomment-378918484 and https://stackoverflow.com/a/39779271/8236733) recommended zipping the module and adding to the spark job via sparkContext.addPyFiles(mymodulefiles.zip), yet still getting error. The relevant code snippets being...

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

(where the package I'm importing here cann be found at https://github.com/cerndb/dist-keras),

conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)  #master='yarn-client'
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

if using_spark_2:
    from pyspark.sql import SparkSession

    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
    sc.sparkContext.addPyFile("/home/me/Downloads/distkeras.zip") # see https://github.com/cerndb/dist-keras/issues/36#issuecomment-378918484 and https://forums.databricks.com/answers/10207/view.html
    print sc.version

(distkeras.zip being a zipped file of this dir.: https://github.com/cerndb/dist-keras/tree/master/distkeras), and

transformer = OneHotTransformer(output_dim=nb_classes, input_col="label_index", output_col="label")
dataset = transformer.transform(dataset)

"""throwing error...
.....
  File "/opt/mapr/spark/spark-2.1.0/python/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj)
ImportError: No module named distkeras.utils

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
.....
"""

From the docs and examples I could find (http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.addPyFile and https://forums.databricks.com/questions/10193/the-proper-way-to-add-in-dependency-py-files.html), the code above seems like it should work to me (again, never used spark before). Anyone have any idea what I'm doing wrong here? Any more info that could be posted that would be useful for debugging?

like image 509
lampShadesDrifter Avatar asked Jul 20 '18 21:07

lampShadesDrifter


2 Answers

Fixed problem. Admittedly, solution is not totally spark-related, but leaving question posted for the sake of others who may have similar problem, since the given error message did not make my mistake totally clear from the start.

TLDR: Make sure the package contents (so they should include an __init.py__ in each dir.) of the zip file being loaded in are structured and named the way your code expects.


The package I was trying to load into the spark context via zip was of the form

mypkg
    file1.py
    file2.py
    subpkg1
        file11.py
    subpkg2
        file21.py

my zip when running less mypkg.zip, showed

file1.py file2.py subpkg1 subpkg2

So two things were wrong here.

  1. Was not zipping the toplevel dir. that was the main package that the coded was expecting to work with
  2. Was not zipping the lower level dirs.

Solved with zip -r mypkg.zip mypkg

More specifically, had to make 2 zip files

  1. for the dist-keras package:

    cd dist-keras; zip -r distkeras.zip distkeras

see https://github.com/cerndb/dist-keras/tree/master/distkeras

  1. for the keras package used by distkeras (which is not installed across the cluster):

    cd keras; zip -r keras.zip keras

see https://github.com/keras-team/keras/tree/master/keras

So declaring the spark session looked like

conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)  #master='yarn-client'
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    from pyspark.sql import SparkSession

    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
    sc.sparkContext.addPyFile("/home/me/projects/keras-projects/exploring-keras/keras-dist_test/dist-keras/distkeras.zip")
    sc.sparkContext.addPyFile("/home/me/projects/keras-projects/exploring-keras/keras-dist_test/keras/keras.zip")
    print sc.version
like image 92
lampShadesDrifter Avatar answered Oct 18 '22 21:10

lampShadesDrifter


if your module is as below

myModule \n
- init.py
-spark1.py
-spark2.py

Don't go inside myModule folder and add to zip. This error you mentioned.

Instead, go outside the myModule folder. right-click and add myModule folder to zip and give another name.

The idea is when spark extract your zip, there should be myModule folder exist with same name and hyrarchy

like image 4
Tanaji Sutar Avatar answered Oct 18 '22 21:10

Tanaji Sutar