I'm confused as to why it appears that Spark is using 1 task for rdd.mapPartitions
when converting the resulting RDD to a DataFrame.
This is an issue for me because I would like to go from :
DataFrame
--> RDD
--> rdd.mapPartitions
--> DataFrame
so that I can read in data (DataFrame), apply a non-SQL function to chunks of data (mapPartitions on RDD) and then convert back to a DataFrame so that I can using the DataFrame.write
process.
I am able to go from DataFrame --> mapPartitions and then use an RDD writer like saveAsTextFile but that is less than ideal since the DataFrame.write
process can do things like overwrite and save data in Orc format. So I'd like to learn why this is going on, but from a pratical perspective I'm primarily concerned with being able to just go from a DataFrame --> mapParitions --> to using the DataFrame.write process.
Here is a reproducible example. The following works as expected, with 100 tasks for the mapPartitions
work:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
However if the last line is change to something like spark_df.rdd.mapPartitions(f).toDF().show()
then there will only be one task for the mapPartitions
work.
Some screenshots illustrating this below:
Return a new RDD by applying a function to each partition of this RDD.
mapPartitions() – This is exactly the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row.
DataFrame.show()
only shows the first number of rows of your dataframe, by default only the first 20. If that number is smaller than the number of rows per partition, Spark is lazy and only evaluates a single partition, which is equivalent to a single task.
You can also do collect
on a dataframe, to compute and collect all partitions and see 100 tasks again.
You will still see the runJob
task first as before, which is caused by the toDF
call to be able to determine the resulting dataframe's schema: it needs to process a single partition to be able to determine the output types of your mapping function. After this initial stage the actual action such as collect
will happen on all partitons. For instance, for me running your snippet with the last line replaced with spark_df.rdd.mapPartitions(f).toDF().collect()
results in these stages:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With