Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling schema mismatches in Spark

I am reading a csv file using Spark in Scala. The schema is predefined and i am using it for reading. This is the esample code:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)

From what i read when reading cav with Spark using a schema there are 3 options:

  1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema
  2. Set mode to PERMISSIVE --> this will set the whole line to null values
  3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame. Basically, I want a combination of FAILFAST and DROPMALFORMED.

Thanks in advance

like image 849
Ben Hoffman Avatar asked Nov 08 '22 00:11

Ben Hoffman


1 Answers

This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:

val schema= StructType(Array(
    StructField("col1", IntegerType,true),    
    StructField("col2", StringType,false),
    StructField("col3", StringType,true),
    StructField("_corrupt_record", StringType, true)))

Then I read the CSV using PERMISSIVE mode (it is Spark default):

val dataFrame: DataFrame = spark.read.format("csv")
                                .schema(schema)
                                .option("header", false)
                                .option("mode", "PERMISSIVE")
                                .load(inputCsvPath)

Now my data frame holds an additional column that holds the rows with schema mismatches. I filtered the rows that have mismatched data and printed it:

val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
like image 69
Ben Hoffman Avatar answered Nov 14 '22 20:11

Ben Hoffman