Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark using one task for mapPartitions when converting rdd to dataframe

I'm confused as to why it appears that Spark is using 1 task for rdd.mapPartitions when converting the resulting RDD to a DataFrame.

This is an issue for me because I would like to go from :

DataFrame --> RDD --> rdd.mapPartitions --> DataFrame

so that I can read in data (DataFrame), apply a non-SQL function to chunks of data (mapPartitions on RDD) and then convert back to a DataFrame so that I can using the DataFrame.write process.

I am able to go from DataFrame --> mapPartitions and then use an RDD writer like saveAsTextFile but that is less than ideal since the DataFrame.write process can do things like overwrite and save data in Orc format. So I'd like to learn why this is going on, but from a pratical perspective I'm primarily concerned with being able to just go from a DataFrame --> mapParitions --> to using the DataFrame.write process.

Here is a reproducible example. The following works as expected, with 100 tasks for the mapPartitions work:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
    .builder \
    .master("yarn-client") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
    return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()

However if the last line is change to something like spark_df.rdd.mapPartitions(f).toDF().show() then there will only be one task for the mapPartitions work.

Some screenshots illustrating this below: enter image description here enter image description here

like image 276
David Avatar asked Nov 22 '16 16:11

David


People also ask

What does mapPartitions return?

Return a new RDD by applying a function to each partition of this RDD.

What is the difference between MAP and mapPartitions in Spark?

mapPartitions() – This is exactly the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row.


1 Answers

DataFrame.show() only shows the first number of rows of your dataframe, by default only the first 20. If that number is smaller than the number of rows per partition, Spark is lazy and only evaluates a single partition, which is equivalent to a single task.

You can also do collect on a dataframe, to compute and collect all partitions and see 100 tasks again.

You will still see the runJob task first as before, which is caused by the toDF call to be able to determine the resulting dataframe's schema: it needs to process a single partition to be able to determine the output types of your mapping function. After this initial stage the actual action such as collect will happen on all partitons. For instance, for me running your snippet with the last line replaced with spark_df.rdd.mapPartitions(f).toDF().collect() results in these stages:

enter image description here

like image 157
sgvd Avatar answered Sep 20 '22 11:09

sgvd