Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge Spark output CSV files with a single header

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.

If I use .option("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shell with the code below.

import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val configuration = new Configuration(); val fs = FileSystem.get(configuration); FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "") 

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

I even tried adding df.columns.mkString(",") as the last argument for copyMerge, but this added the headers still multiple times, not once.

like image 278
V. Samma Avatar asked Jun 27 '16 14:06

V. Samma


People also ask

How do I merge CSV files in Spark?

1. Write a Single file using Spark coalesce() & repartition() When you are ready to write a DataFrame, first use Spark repartition() and coalesce() to merge data from all partitions into a single partition and then save it to a file.

How do I save a Spark DataFrame as a CSV with header?

In order to write DataFrame to CSV with a header, you should use option(), Spark CSV data-source provides several options which we will see in the next section. I have 3 partitions on DataFrame hence it created 3 part files when you save it to the file system.

What does the header option do in spark-CSV?

If I use .option ("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

How do I merge multiple files in spark?

Spark Write DataFrame into Single CSV File (merge multiple part files) 1 1. Write a Single file using Spark coalesce () & repartition () 2 2. Write Single File using Hadoop FileSystem Library. 3 3. Merge Using HDFS getMerge () 4 4. Write a Single File in Databricks. 5 5. Complete Example.

How do I read and write a CSV file in spark?

Spark SQL provides spark.read ().csv ("file_name") to read a file or directory of files in CSV format into Spark DataFrame, and dataframe.write ().csv ("path") to write to a CSV file.

How do I write a Dataframe in spark?

When you are ready to write a DataFrame, first use Spark repartition () and coalesce () to merge data from all partitions into a single partition and then save it to a file. This still creates a directory and write a single part file inside a directory instead of multiple part files.


2 Answers

you can walk around like this.

  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk   //cast types of all columns to String   val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)    //create a new data frame containing only header names   import scala.collection.JavaConverters._   val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)    //merge header names with data   headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)    //use hadoop FileUtil to merge all partition csv files into a single file   val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)   FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null) 
like image 191
Kang Avatar answered Oct 11 '22 22:10

Kang


  1. Output the header using dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _))
  2. create a file with the header on dsefs
  3. append all the partition files (headerless) to the file in #2 using hadoop Filesystem API
like image 26
Sam Jacob Avatar answered Oct 11 '22 23:10

Sam Jacob