We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.
Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.
We are loading using:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.load(glob)
Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?
(We are planning to do more pre-processing, but this would be the most immediate and critical fix.)
In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE
such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.
You should be able to simply add the option:
.option("mode", "DROPMALFORMED")
like this:
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load(glob)
and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With