I am reading a csv file using Spark in Scala. The schema is predefined and i am using it for reading. This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
DROPMALFORMED
--> this will drop the lines that don't match the schemaPERMISSIVE
--> this will set the whole line to null valuesFAILFAST
--> this will throw an exception when a mismatch is discoveredWhat is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame. Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches. I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With