Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe Write to CSV creates _temporary directory file in Standalone Cluster Mode

I am running spark job in a cluster which has 2 worker nodes! I am using the code below (spark java) for saving the computed dataframe as csv to worker nodes.

dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath); I am trying to understand how spark writes multiple part files on each worker node.

Run1) worker1 has part files and SUCCESS ; worker2 has _temporarty/task*/part* each task has the part files run.

Run2) worker1 has part files and also _temporary directory; worker2 has multiple part files

Can anyone help me understand why is this behavior? 1)Should I consider the records in outputDir/_temporary as part of the output file along with the part files in outputDir?

2)Is _temporary dir supposed to be deleted after job run and move the part files to outputDir?

3)why can't it create part files directly under ouput dir?

coalesce(1) and repartition(1) cannot be the option since the outputDir file itself will be around 500GB

Spark 2.0.2. 2.1.3 and Java 8, no HDFS

like image 994
Omkar Puttagunta Avatar asked Aug 30 '18 04:08

Omkar Puttagunta


People also ask

How do I convert a Spark DataFrame to a CSV file?

In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.


3 Answers

TL;DR To properly write (or read for that matter) data using file system based source you'll need a shared storage.

_temporary directory is a part of basic commit mechanism used by Spark - data is first written to a temporary directory, and once all task finished, atomically moved to the final destination. You can read more about this process in Spark _temporary creation reason

For this process to be successful you need a shared file system (HDFS, NFS, and so on) or equivalent distributed storage (like S3). Since you don't have one, failure to clean temporary state is expected - Saving dataframe to local file system results in empty results.

The behavior you observed (data partially committed and partially not) can occur, when some executors are co-located with the driver and share file system with the driver, enabling full commit for the subset of data.

like image 120
2 revs Avatar answered Oct 06 '22 14:10

2 revs


After analysis, observed that my spark job is using fileoutputcommitter version 1 which is default. Then I included config to use fileoutputcommitter version 2 instead of version 1 and tested in 10 node spark standalone cluster in AWS. All part-* files are generated directly under outputDirPath specified in the dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)

We can set the property

  1. By including the same as --conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2' in spark-submit command

  2. or set the property using sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")

I understand the consequence in case of failures as outlined in the spark docs, but I achieved the desired result!

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, defaultValue is 1
The file output committer algorithm version, valid algorithm version number: 1 or 2. Version 2 may have better performance, but version 1 may handle failures better in certain situations, as per MAPREDUCE-4815.

like image 28
Omkar Puttagunta Avatar answered Oct 06 '22 15:10

Omkar Puttagunta


Multiple part files are based on your dataframe partition. The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data.

you can control it by using coalesce or repartition. you can reduce the partition or increase it.

if you make coalesce of 1, then you wont see multiple part files in it but this affects writing Data in Parallel.

[outputDirPath = /tmp/multiple.csv ]

dataframe
 .coalesce(1)
 .write.option("header","false")
 .mode(SaveMode.Overwrite)
 .csv(outputDirPath);

on your question on how to refer it..

refer as /tmp/multiple.csv for all below parts.

/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
like image 2
Karthick Avatar answered Oct 06 '22 15:10

Karthick