Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple spark jobs appending parquet data to same base path with partitioning

I have multiple jobs that I want to execute in parallel that append daily data into the same path using partitioning.

e.g.

dataFrame.write().          partitionBy("eventDate", "category")             .mode(Append)             .parquet("s3://bucket/save/path"); 

Job 1 - category = "billing_events" Job 2 - category = "click_events"

Both of these jobs will truncate any existing partitions that exist in the s3 bucket prior to execution and then save the resulting parquet files to their respective partitions.

i.e.

job 1 - > s3://bucket/save/path/eventDate=20160101/channel=billing_events

job 2 - > s3://bucket/save/path/eventDate=20160101/channel=click_events

The problem im facing is the temporary files that get created during the job execution by spark. It saves the working out files to the base path

s3://bucket/save/path/_temporary/...

so both jobs end up sharing the same temp folder and cause conflict, which ive noticed can cause one job to delete temp files, and the other job fail with a 404 from s3 saying an expected temp file doesnt exist.

Has anyone faced this issue and come up with a strategy to have parallel execution of jobs in the same base path?

im using spark 1.6.0 for now

like image 417
vcetinick Avatar asked Aug 16 '16 00:08

vcetinick


1 Answers

So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.

I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...

Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.

I've ruled out using the DirectOutputCommitter for a number of reasons.

  1. When used in conjunction with speculation mode it results in duplication (https://issues.apache.org/jira/browse/SPARK-9899)
  2. Task failures will leave clutter which would be impossible to find and remove/clean later.
  3. Spark 2.0 has removed support for this completely and no upgrade path exists.(https://issues.apache.org/jira/browse/SPARK-10063)

The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.

This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.

like image 102
vcetinick Avatar answered Sep 26 '22 11:09

vcetinick