I am running spark job
in a cluster which has 2 worker nodes! I am using the code below (spark java) for saving the computed dataframe as csv to worker nodes.
dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);
I am trying to understand how spark writes multiple part files on each worker node.
Run1) worker1
has part files
and SUCCESS
; worker2
has _temporarty/task*/part*
each task has the part files run.
Run2) worker1
has part files and also _temporary
directory; worker2
has multiple part files
Can anyone help me understand why is this behavior?
1)Should I consider the records in outputDir/_temporary
as part of the output file along with the part files in outputDir
?
2)Is _temporary
dir supposed to be deleted after job run and move the part
files to outputDir
?
3)why can't it create part files directly under ouput dir?
coalesce(1)
and repartition(1)
cannot be the option since the outputDir file itself will be around 500GB
Spark 2.0.2. 2.1.3
and Java 8, no HDFS
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.
TL;DR To properly write (or read for that matter) data using file system based source you'll need a shared storage.
_temporary
directory is a part of basic commit mechanism used by Spark - data is first written to a temporary directory, and once all task finished, atomically moved to the final destination. You can read more about this process in Spark _temporary creation reason
For this process to be successful you need a shared file system (HDFS, NFS, and so on) or equivalent distributed storage (like S3). Since you don't have one, failure to clean temporary state is expected - Saving dataframe to local file system results in empty results.
The behavior you observed (data partially committed and partially not) can occur, when some executors are co-located with the driver and share file system with the driver, enabling full commit for the subset of data.
After analysis, observed that my spark job is using fileoutputcommitter version 1
which is default.
Then I included config to use fileoutputcommitter version 2
instead of version 1
and tested in 10 node spark standalone cluster in AWS. All part-* files
are generated directly under outputDirPath
specified in the dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)
We can set the property
By including the same as --conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'
in spark-submit command
or set the property using sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")
I understand the consequence in case of failures as outlined in the spark docs, but I achieved the desired result!
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, defaultValue is 1
The file output committer algorithm version, valid algorithm version number: 1 or 2. Version 2 may have better performance, but version 1 may handle failures better in certain situations, as per MAPREDUCE-4815.
Multiple part files are based on your dataframe partition. The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data.
you can control it by using coalesce or repartition. you can reduce the partition or increase it.
if you make coalesce of 1, then you wont see multiple part files in it but this affects writing Data in Parallel.
[outputDirPath = /tmp/multiple.csv ]
dataframe
.coalesce(1)
.write.option("header","false")
.mode(SaveMode.Overwrite)
.csv(outputDirPath);
on your question on how to refer it..
refer as /tmp/multiple.csv
for all below parts.
/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With