I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.
I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.
Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.
If I use .option("header","true")
for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.
Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge
. I tried this in spark-shell
with the code below.
import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val configuration = new Configuration(); val fs = FileSystem.get(configuration); FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?
I even tried adding df.columns.mkString(",")
as the last argument for copyMerge
, but this added the headers still multiple times, not once.
1. Write a Single file using Spark coalesce() & repartition() When you are ready to write a DataFrame, first use Spark repartition() and coalesce() to merge data from all partitions into a single partition and then save it to a file.
In order to write DataFrame to CSV with a header, you should use option(), Spark CSV data-source provides several options which we will see in the next section. I have 3 partitions on DataFrame hence it created 3 part files when you save it to the file system.
If I use .option ("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.
Spark Write DataFrame into Single CSV File (merge multiple part files) 1 1. Write a Single file using Spark coalesce () & repartition () 2 2. Write Single File using Hadoop FileSystem Library. 3 3. Merge Using HDFS getMerge () 4 4. Write a Single File in Databricks. 5 5. Complete Example.
Spark SQL provides spark.read ().csv ("file_name") to read a file or directory of files in CSV format into Spark DataFrame, and dataframe.write ().csv ("path") to write to a CSV file.
When you are ready to write a DataFrame, first use Spark repartition () and coalesce () to merge data from all partitions into a single partition and then save it to a file. This still creates a directory and write a single part file inside a directory instead of multiple part files.
you can walk around like this.
In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following
//dataFrame is the data to save on disk //cast types of all columns to String val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*) //create a new data frame containing only header names import scala.collection.JavaConverters._ val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema) //merge header names with data headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder) //use hadoop FileUtil to merge all partition csv files into a single file val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration) FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With