Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing CSV file using Spark and java - handling empty values and quotes

Initial data is in Dataset<Row> and I am trying to write to pipe delimited file and I want each non empty cell and non null values to be placed in quotes. Empty or null values should not contain quotes

result.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("nullValue", "")
            .option("quoteAll", "false")
            .csv(Location);

Expected output:

"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Current Output:

London||UK
Delhi|India
Moscow|Russia

If I change the "quoteAll" to "true", output I am getting is:

"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Spark version is 2.3 and java version is java 8

like image 237
Ram Grandhi Avatar asked Feb 26 '20 16:02

Ram Grandhi


People also ask

How do you ignore double quotes when reading a CSV file in Spark?

quote – sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ". If you would like to turn off quotations, you need to set an empty string.

How does Spark deal with null values?

Spark Replace Null Values with Empty StringSpark fill(value:String) signatures are used to replace null values with an empty string or any constant values String on DataFrame or Dataset columns.

How to write spark dataframe to CSV file?

Use the write () method of the Spark DataFrameWriter object to write Spark DataFrame to a CSV file. For detailed example refer to Writing Spark DataFrame to CSV File using Options.

How do I read a CSV file in Apache Spark?

Apache Spark Spark SQL provides spark.read.csv ("path") to read a CSV file into Spark DataFrame and dataframe.write.csv ("path") to save or write to the CSV file. Spark supports reading pipe, comma, tab, or any other delimiter/seperator files.

How does spark write to multiple files?

By design, when you save an RDD, DataFrame, or Dataset, Spark creates a folder with the name specified in a path and writes data as multiple part files in parallel (one-part file for each partition). Each part file will have an extension of the format you write (for example .csv, .json, .txt e.t.c)

How to write a CSV file using opencsv jar?

You can Download OpenCSV Jar and include in your project class path. Writing a CSV file is as simple as reading. Create an instance of CSVWriter by passing FileWriter object as parameter and start writing data to CSV file using methods of CSVWriter Class.


2 Answers

Java answer. CSV escape is not just adding " symbols around. You should handle " inside strings. So let's use StringEscapeUtils and define UDF that will call it. Then just apply the UDF to each of the column.

import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import java.util.Arrays;

public class Test {

    void test(Dataset<Row> result, String Location) {
        // define UDF
        UserDefinedFunction escape = udf(
            (String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
        );
        // call udf for each column
        Column columns[] = Arrays.stream(result.schema().fieldNames())
                .map(f -> escape.apply(col(f)).as(f))
                .toArray(Column[]::new);

         // save the result
        result.select(columns)
                .coalesce(1).write()
                .option("delimiter", "|")
                .option("header", "true")
                .option("nullValue", "")
                .option("quoteAll", "false")
                .csv(Location);
    }
}

Side note: coalesce(1) is a bad call. It collect all data on one executor. You can get executor OOM in production for huge dataset.

like image 171
Artem Aliev Avatar answered Oct 21 '22 05:10

Artem Aliev


EDIT & Warning: Did not see java tag. This is Scala solution that uses foldLeft as a loop to go over all columns. If this is replaced by a Java friendly loop, everything should work as is. I will try and look back at this at the later time.

A programmatic solution could be

val columns = result.columns
val randomColumnName = "RND"

val result2 = columns.foldLeft(result) { (data, column) =>
data
  .withColumnRenamed(column, randomColumnName)
  .withColumn(column,
    when(col(randomColumnName).isNull, "")
      .otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
  )
  .drop(randomColumnName)
}

This will produce the strings with " around them and write empty strings in nulls. If you need to keep nulls, just keep them.

Then just write it down:

result2.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("quoteAll", "false")
            .csv(Location);
like image 21
Saša Zejnilović Avatar answered Oct 21 '22 04:10

Saša Zejnilović