I'm trying to use pyspark csv reader with the following criteria:
Here is what I have tried.
file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
columnNameOfCorruptRecord='broken')
print(df.show())
Output:
+----+----+
| a| b|
+----+----+
| 1| 2|
|null|null|
+----+----+
This command does not store the corrupted records. If I add broken
to
the schema and remove header validation the command
works with a warning.
DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True,
columnNameOfCorruptRecord='broken')
print(df.show())
Output:
WARN CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
| a| b|broken|
+----+----+------+
| 1| 2| null|
|null|null|3,four|
+----+----+------+
Is this intended behavior or is there a bug that breaks the first example? Is there a better way to do this?
One more thing. I want to process well-formed fields in corrupted records to get a dataframe like this.
+--+----+------+
| a| b|broken|
+--+----+------+
| 1| 2| null|
| 3|null|3,four|
+--+----+------+
Should I make an extra step post reading to get that, or is there some option I have missed to be more permissive.
Like @deo said, when using columnNameOfCorruptRecord
, Spark will implicitly create the column before dropping it during parsing. In order to keep the column, you need to explicitly add it to your schema. Note that this behavior also depends on what mode
you specify when reading.
See this snippet for the mode
param in the Spark documentation:
PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.
That's the correct default behavior. If you are inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema, otherwise you have to provide a string type field named columnNameOfCorruptRecord in an user-defined schema or change the column name like broken and add the same name to the schema.
There is no option process the data partially as you mentioned, for that to happen you need to write your own custom parser extending CSVFileFormat in spark. For list of all csvoptions, check org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With