I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.
from pyspark.context import SparkContext
file = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file)
splits = [data.map(lambda p : int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()
The content in the input file (sample.txt) is:
1
2
3
I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):
[1,2,3]
[2,3,4]
[3,4,5]
whereas the actual output was :
[4, 5, 6]
[4, 5, 6]
[4, 5, 6]
which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).
Why does this behavior happen ?
PySpark is very well used in Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, TensorFlow.
In Spark, Lazy Evaluation means that You can apply as many TRANSFORMATIONs as you want, but Spark will not start the execution of the process until an ACTION is called. 💡 So transformations are lazy but actions are eager.
PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. If you're already familiar with Python and libraries such as Pandas, then PySpark is a good language to learn to create more scalable analyses and pipelines.
Machine Learning in PySpark is easy to use and scalable. It works on distributed systems. You can use Spark Machine Learning for data analysis. There are various techniques you can make use of with Machine Learning algorithms such as regression, classification, etc., all because of the PySpark MLlib.
It happens because of Python late binding and is not (Py)Spark specific. i
will be looked-up when lambda p : int(p) + i
is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.
You can do for example something like this:
def f(i):
def _f(x):
try:
return int(x) + i
except:
pass
return _f
data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
This is due to to the fact that lambdas refer to the i via reference! It has nothing to do with spark. See this
You can try this:
a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)]
splits = [data.map(a[x]) for x in range(4)]
or in one line
splits = [
data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x])
for x in range(4)
]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With