I have multiple jobs that I want to execute in parallel that append daily data into the same path using partitioning.
e.g.
dataFrame.write(). partitionBy("eventDate", "category") .mode(Append) .parquet("s3://bucket/save/path");
Job 1 - category = "billing_events" Job 2 - category = "click_events"
Both of these jobs will truncate any existing partitions that exist in the s3 bucket prior to execution and then save the resulting parquet files to their respective partitions.
i.e.
job 1 - > s3://bucket/save/path/eventDate=20160101/channel=billing_events
job 2 - > s3://bucket/save/path/eventDate=20160101/channel=click_events
The problem im facing is the temporary files that get created during the job execution by spark. It saves the working out files to the base path
s3://bucket/save/path/_temporary/...
so both jobs end up sharing the same temp folder and cause conflict, which ive noticed can cause one job to delete temp files, and the other job fail with a 404 from s3 saying an expected temp file doesnt exist.
Has anyone faced this issue and come up with a strategy to have parallel execution of jobs in the same base path?
im using spark 1.6.0 for now
So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.
I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...
Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.
I've ruled out using the DirectOutputCommitter for a number of reasons.
The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.
This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With