I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.
First a little context: As far as I understood pySpark-UDFs
force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing.
Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has.
Then I read that foreachPartition
applies a function to all the data within a partition and, hence, allows parallel processing.
My questions now are:
When I apply a Python-function via foreachPartition
, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?
Is the data processed row-wise within foreachPartition
(meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?
Thank you in advance for your input!
Edit:
A working in driver-solution I used before looks like this, taken from SO here:
for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
# Do stuff on the partition
As can be read from the docs rdd.toLocalIterator()
provides the necessary functionality:
Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.
Luckily I stumbled upon this great explanation of mapPartitions
from Mrinal (answered here).
mapPartitions
applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes.
While foreachPartition
only applies a function (e.g. write your data in a .csv-file), mapPartitions
also returns a new RDD. Therefore, using foreachPartition
was the wrong choice for me.
In order to answer my second question: Functions like map
or UDFs
create a new Python-instance and pass data from the DataFrame/RDD row-by-row, resulting in a lot of overhead. foreachPartition
and mapPartitions
(both RDD-functions) transfer an entire partition to a Python-instance.
Additionally, using generators also reduces the amount of memory necessary for iterating over this transferred partition data (partitions are handled as iterator objects, while each row is then processed by iterating over this object).
An example might look like:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
Hope this helps somebody facing similar problems :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With