Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pySpark forEachPartition - Where is code executed

I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.

First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing. Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has. Then I read that foreachPartition applies a function to all the data within a partition and, hence, allows parallel processing.

My questions now are:

  1. When I apply a Python-function via foreachPartition, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?

  2. Is the data processed row-wise within foreachPartition (meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?

Thank you in advance for your input!


Edit:

A working in driver-solution I used before looks like this, taken from SO here:

for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
    # Do stuff on the partition

As can be read from the docs rdd.toLocalIterator() provides the necessary functionality:

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

like image 887
Markus Avatar asked Jan 27 '23 11:01

Markus


1 Answers

Luckily I stumbled upon this great explanation of mapPartitions from Mrinal (answered here).

mapPartitions applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes. While foreachPartition only applies a function (e.g. write your data in a .csv-file), mapPartitions also returns a new RDD. Therefore, using foreachPartition was the wrong choice for me.

In order to answer my second question: Functions like map or UDFs create a new Python-instance and pass data from the DataFrame/RDD row-by-row, resulting in a lot of overhead. foreachPartition and mapPartitions (both RDD-functions) transfer an entire partition to a Python-instance.

Additionally, using generators also reduces the amount of memory necessary for iterating over this transferred partition data (partitions are handled as iterator objects, while each row is then processed by iterating over this object).

An example might look like:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

Hope this helps somebody facing similar problems :)

like image 149
Markus Avatar answered Jan 30 '23 22:01

Markus