Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark Evaluation

I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.

from pyspark.context import SparkContext
file  = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file) 
splits = [data.map(lambda p :  int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()

The content in the input file (sample.txt) is:

1
2
3

I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):

[1,2,3]
[2,3,4]
[3,4,5]

whereas the actual output was :

[4, 5, 6]
[4, 5, 6]
[4, 5, 6]

which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).

Why does this behavior happen ?

like image 462
srjit Avatar asked Jun 28 '16 18:06

srjit


People also ask

Is PySpark still used?

PySpark is very well used in Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, TensorFlow.

What is Spark evaluation?

In Spark, Lazy Evaluation means that You can apply as many TRANSFORMATIONs as you want, but Spark will not start the execution of the process until an ACTION is called. 💡 So transformations are lazy but actions are eager.

What is PySpark good for?

PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. If you're already familiar with Python and libraries such as Pandas, then PySpark is a good language to learn to create more scalable analyses and pipelines.

Is PySpark good for machine learning?

Machine Learning in PySpark is easy to use and scalable. It works on distributed systems. You can use Spark Machine Learning for data analysis. There are various techniques you can make use of with Machine Learning algorithms such as regression, classification, etc., all because of the PySpark MLlib.


2 Answers

It happens because of Python late binding and is not (Py)Spark specific. i will be looked-up when lambda p : int(p) + i is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.

You can do for example something like this:

def f(i):
    def _f(x):
        try:
            return int(x) + i
        except:
            pass
    return _f

data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
like image 72
zero323 Avatar answered Oct 19 '22 17:10

zero323


This is due to to the fact that lambdas refer to the i via reference! It has nothing to do with spark. See this

You can try this:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)]
splits = [data.map(a[x]) for x in range(4)]

or in one line

splits = [
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x])
    for x in range(4)
]
like image 35
Himaprasoon Avatar answered Oct 19 '22 16:10

Himaprasoon