Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: Using Object in RDD

I am currently learning Python and want to apply it on/with Spark. I have this very simple (and useless) script:

import sys
from pyspark import SparkContext

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)

    def getValue(self):
        return self.v

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
    print(reduzed.collect())

When executing it with

spark-submit CustomClass.py

..the following error is thorwn (output shortened):

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...

To me the statement

PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed

seems to be important. It means that the class instances can't be serialized, right? Do you know how to solve this issue?

Thanks and regards

like image 673
Daniel Avatar asked Nov 10 '15 20:11

Daniel


1 Answers

There are a number of issues:

  • If you put MyClass in a separate file it can be pickled. This is a common problem for many Python uses of pickle. This is simple to solve by moving MyClass and the using from myclass import MyClass. Normally dill can fix these issues (as in import dill as pickle), but it didn't work for me here.
  • Once this is solved, your reduce doesn't work since calling addValue return None (no return), not an instance of MyClass. You need to change addValue to return self.
  • Finally, the lambda need to call getValue, so should have a.addValue(b.getValue())

Together: myclass.py

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)
        return self

    def getValue(self):
        return self.v

main.py

import sys
from pyspark import SparkContext
from myclass import MyClass

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
    print(reduzed.collect())
like image 127
Kevin S Avatar answered Oct 23 '22 09:10

Kevin S