I am currently learning Python and want to apply it on/with Spark. I have this very simple (and useless) script:
import sys
from pyspark import SparkContext
class MyClass:
    def __init__(self, value):
        self.v = str(value)
    def addValue(self, value):
        self.v += str(value)
    def getValue(self):
        return self.v
if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)
    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
    print(reduzed.collect())
When executing it with
spark-submit CustomClass.py
..the following error is thorwn (output shortened):
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...
To me the statement
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
seems to be important. It means that the class instances can't be serialized, right? Do you know how to solve this issue?
Thanks and regards
There are a number of issues:
MyClass in a separate file it can be pickled.  This is a common problem for many Python uses of pickle.  This is simple to solve by moving MyClass and the using from myclass import MyClass. Normally dill can fix these issues (as in import dill as pickle), but it didn't work for me here.addValue return None (no return), not an instance of MyClass.  You need to change addValue to return self.lambda need to call getValue, so should have a.addValue(b.getValue())
Together: 
myclass.py
class MyClass:
    def __init__(self, value):
        self.v = str(value)
    def addValue(self, value):
        self.v += str(value)
        return self
    def getValue(self):
        return self.v
main.py
import sys
from pyspark import SparkContext
from myclass import MyClass
if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)
    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
    print(reduzed.collect())
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With