Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark avoid creating _temporary directory in S3

I need to upload a dataframe to S3 bucket but I do not have delete permissions on the bucket. Is there any way I can avoid creating this _temporary directory on S3? Maybe any way in spark to use local FS for _temporary directory and then uploading final resulting file to S3 bucket or totally avoid _temporary directory.

Thanks in advance.

like image 902
Shubham Jain Avatar asked Oct 10 '17 11:10

Shubham Jain


People also ask

Does Spark work on S3?

With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.

How many partitions does Spark create when a file is loaded from S3 bucket?

Even when reading a file from an S3 bucket, Spark (by default) creates one partition per block i.e. total no of partitions = total-file-size / block-size.


2 Answers

No.

Data is written into _temporary/jobAttemptID/taskAttemptID/ and then renamed into the dest dir during task/job commit.

What you can do is write to hdfs for your jobs and then copy up using distcp. There are lots of advantages for this, not least being "with a consistent filesystem you don't run the risk of data loss you have from the s3n or s3a connectors"

2019-07-11 Update. The Apache Hadoop S3A committers let you commit work without the temp folder or rename, delivering performance and correct results even against an inconsistent S3 Store. This is how you can safely commit work. Amazon EMR have their own reimplementation of this own work, albeit (currently without the complete failure semantics which Spark expects

like image 199
stevel Avatar answered Nov 18 '22 16:11

stevel


Yes, you can avoid creating _temporary directory when uploading dataframe to s3.

When Spark appends data to an existing dataset, Spark uses FileOutputCommitter to manage staging output files and final output files.

By default, output committer algorithm uses version 1. In this version, FileOutputCommitter has two methods, commitTask and commitJob. commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory to the final destination.

However, when output committer algorithm uses version 2, commitTask moves data generated by a task directly to the final destination and commitJob is basically a no-op.

How do I set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2? You can set this config by using any of the following methods:

  • When you launch your cluster, you can put spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 in the Spark config.
  • spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
  • When you write data using Dataset API, you can set it in the option, i.e. dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2").

Read more about the output committer algorithm versions databricks-blog and mapred-default

like image 27
yardstick17 Avatar answered Nov 18 '22 18:11

yardstick17