Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Achieve concurrency when saving to a partitioned parquet file

Tags:

When writing a dataframe to parquet using partitionBy :

df.write.partitionBy("col1","col2","col3").parquet(path)

It would be my expectation that each partition being written were done independently by a separate task and in parallel to the extent of the number of workers assigned to the current spark job.

However there is actually only one worker/task running at a time when writing to the parquet. That one worker is cycling through each of the partitions and writing out the .parquet files serially. Why would this be the case - and is there a way to compel concurrency in this spark.write.parquet operation?

The following is not what I want to see (should be 700%+ ..)

enter image description here

From this other post I also tried adding repartition in front

Spark parquet partitioning : Large number of files

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

This unfortunately had no effect: still one worker only..

Note: I am running in local mode with local[8] and have seen other spark operations run with as many as eight concurrent workers and using up to 750% of the cpus.

like image 617
WestCoastProjects Avatar asked Jun 26 '18 19:06

WestCoastProjects


1 Answers

In short, writing the multiple output files from a single task is not parallelized, but assuming you have multiple tasks (multiple input splits) each one of those will get their own core on a worker.

The goal of writing out partitioned data isn't to parallelize your writing operation. Spark is already doing that by simultaneously writing out multiple tasks at once. The goal is just to optimize future read operations where you want only one partition of the saved data.

The logic to write partitions in Spark is designed to read all of the records from the previous stage only once when writing them out to their destination. I believe part of the design choice is also to protect against the case where a partition key has many many values.

EDIT: Spark 2.x method

In Spark 2.x, it sorts the records in each task by their partition keys, then it iterates through them writing to one file handle at a time. I assume they are doing this to ensure they never open a huge number of file handles if there are a lot of distinct values in your partition keys.

For reference, here is the sorting:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Scroll down a little and you will see it calling write(iter.next()) looping through each row.

And here is the actual writing (one file/partition key at a time):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

There you can see it only holds one file handle open at a time.

EDIT: Spark 1.x method

What spark 1.x does is for a given task is loop through all the records, opening a new file handle whenever it encounters a new output partition it hasn't seen before for this task. It then immediately writes the record to that file handle and goes onto the next one. This means at any given time while processing a single task it can have up to N file handles open just for that one task where N is the maximum number of output partitions. To make it clearer, here is some python psuedo-code to show the general idea:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

There is a caveat to the above strategy for writing out records. In spark 1.x the setting spark.sql.sources.maxConcurrentWrites put an upper limit on the mask file handles that could be open per task. After that was reached, Spark would instead sort the data by the partition key, so it could iterate through the records writing out one file at a time.

like image 125
Ryan Widmaier Avatar answered Sep 19 '22 21:09

Ryan Widmaier