Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ModuleNotFoundError because PySpark serializer is not able to locate library folder

I have the following folder structure

 - libfolder
    - lib1.py
    - lib2.py
 - main.py

main.py calls libfolder.lib1.py which then calls libfolder.lib2.py and others.

It all works perfectly fine in local machine but after I deploy it to Dataproc I get the following error

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'libfolder'

I have zipped the folder into xyz.zip and run the following command:

spark-submit --py-files=xyz.zip main.py

The serializer is not able to find the location for libfolder . Is there a problem with the way i am packaging my folders?

This issue is similar to this one but it's not answered.

Edit: response to Igor's questions

unzip -l for the zip file returns the following

 - libfolder
    - lib1.py
    - lib2.py
 - main.py

In main.py lib1.py is called with this import statement

from libfolder import lib1
like image 358
Golak Sarangi Avatar asked Dec 20 '18 06:12

Golak Sarangi


2 Answers

This worked for me:

$ cat main.py

from pyspark import SparkContext, SparkConf

from subpkg import sub

conf = SparkConf().setAppName("Shell Count")
sc = SparkContext(conf = conf)

text_file = sc.textFile("file:///etc/passwd")
counts = text_file.map(lambda line: sub.map(line)) \
    .map(lambda shell: (shell, 1)) \
    .reduceByKey(lambda a, b: sub.reduce(a, b))

counts.saveAsTextFile("hdfs:///count5.txt")

$ cat subpkg/sub.py

def map(line):
  return line.split(":")[6]

def reduce(a, b):
  return a + b

$ unzip -l /tmp/deps.zip 
Archive:  /tmp/deps.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
        0  2019-01-07 14:22   subpkg/
        0  2019-01-07 13:51   subpkg/__init__.py
       79  2019-01-07 14:13   subpkg/sub.py
---------                     -------
       79                     3 files


$ gcloud dataproc jobs submit pyspark --cluster test-cluster main.py --py-files deps.zip
Job [1f0f15108a4149c5942f49513ce04440] submitted.
Waiting for job output...
Hello world!
Job [1f0f15108a4149c5942f49513ce04440] finished successfully.
like image 138
tix Avatar answered Sep 29 '22 23:09

tix


You don't have to pass zip file using --py-files parameter in spark-submit. You just need to import the package where UDF function resides in your python project For example:

from package1.subpackage1.UDFPythonFile1 import UDF1

Package1 is directory where you are executing your entry point python script (main.py)

  • main.py
  • package1
    • subpackage1
      • UDFPythonFile1.py

Advantage of implementing above approach is to avoid creating zip file every time. This has worked for me on Windows machine.

like image 42
Nikhil Patil Avatar answered Sep 30 '22 00:09

Nikhil Patil