I am getting this error but i do not know why. Basically I am erroring from this code:
a = data.mapPartitions(helper(locations))
where data is an RDD and my helper is defined as:
def helper(iterator, locations):
for x in iterator:
c = locations[x]
yield c
(locations is just an array of data points) I do not see what the problem is but I am also not the best at pyspark so can someone please tell me why I am getting 'PipelinedRDD' object is not iterable from this code?
RDD can iterated by using map and lambda functions. I have iterated through Pipelined RDD using the below method
lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")
pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))
pair_result = pairs1.union(pairs2)
pair_result.reduceByKey(lambda a, b: a + ','+ b)
result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]
===> result_ll = [list(elem) for elem in result]
TypeError: 'PipelinedRDD' object is not iterable
Instead of this I replaced the iteration using map function
result_ll = result.map( lambda elem: list(elem))
Hope this helps to modify your code accordingly
I prefer the answer that said in another question with below link : Can not access Pipelined Rdd in pyspark
You cannot iterate over an RDD, you need first to call an action to get your data back to the driver. Quick sample:
`>>> test = sc.parallelize([1,2,3])
>>> for i in test:
... print i
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'RDD' object is not iterable`
but for example you can use '.collect()'
`>>> for i in test.collect():
... print i
1
2
3`
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With