Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark read csv with schema, header check, and store corrupt records

I'm trying to use pyspark csv reader with the following criteria:

  • Read csv according to datatypes in schema
  • Check that column names in header and schema matches
  • Store broken records in a new field

Here is what I have tried.

file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
                    columnNameOfCorruptRecord='broken')
print(df.show())

Output:

+----+----+
|   a|   b|
+----+----+
|   1|   2|
|null|null|
+----+----+

This command does not store the corrupted records. If I add broken to the schema and remove header validation the command works with a warning.

DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True, 
                    columnNameOfCorruptRecord='broken')
print(df.show())

Output:

WARN  CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
|   a|   b|broken|
+----+----+------+
|   1|   2|  null|
|null|null|3,four|
+----+----+------+

Is this intended behavior or is there a bug that breaks the first example? Is there a better way to do this?

One more thing. I want to process well-formed fields in corrupted records to get a dataframe like this.

+--+----+------+
| a|   b|broken|
+--+----+------+
| 1|   2|  null|
| 3|null|3,four|
+--+----+------+

Should I make an extra step post reading to get that, or is there some option I have missed to be more permissive.

like image 718
Foldager Avatar asked Mar 07 '19 20:03

Foldager


2 Answers

Like @deo said, when using columnNameOfCorruptRecord, Spark will implicitly create the column before dropping it during parsing. In order to keep the column, you need to explicitly add it to your schema. Note that this behavior also depends on what mode you specify when reading.

See this snippet for the mode param in the Spark documentation:

PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.

like image 86
sahibeast Avatar answered Oct 12 '22 00:10

sahibeast


That's the correct default behavior. If you are inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema, otherwise you have to provide a string type field named columnNameOfCorruptRecord in an user-defined schema or change the column name like broken and add the same name to the schema.

There is no option process the data partially as you mentioned, for that to happen you need to write your own custom parser extending CSVFileFormat in spark. For list of all csvoptions, check org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

like image 1
deo Avatar answered Oct 11 '22 23:10

deo