I am trying to multiprocess a list of RDDs as follows
from pyspark.context import SparkContext
from multiprocessing import Pool
def square(rdd_list):
def _square(i):
return i*i
return rdd_list.map(_square)
sc = SparkContext('local', 'Data_Split')
data = sc.parallelize([1,2,3,4,5,6])
dataCollection = [data, data, data]
p = Pool(processes=2)
result = p.map(square, dataCollection)
print result[0].collect()
I am expecting a list of RDDs in output with each element containing the squared elements from data.
But running the code results in the following error :
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.coun\ t() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
My questions are :-
1) Why doesn't the code work as expected ? How can I fix this ?
2) Will I gain any performance enhancement (in terms of lessening the runtime) for my program if I use the p.map (Pool) instead of a simple map on my RDD list.
It's because when you use multi-process, the RDD has to be serialized/pickled before sending to the other processes. Spark performs a check whenever an attempt to serialized an RDD is made, and throw that error.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With