The documentation for the Scala_Spark_DataFrameReader_csv suggests that spark can log the malformed rows detected while reading a .csv file.
- How can one log the malformed rows?
- Can one obtain a val or var containing the malformed rows?
The option from the linked documentation is: maxMalformedLogPerPartition (default 10): sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored
Based on this databricks example you need to explicitly add the "_corrupt_record" column to a schema definition when you read in the file. Something like this worked for me in pyspark 2.4.4:
from pyspark.sql.types import *
my_schema = StructType([
StructField("field1", StringType(), True),
...
StructField("_corrupt_record", StringType(), True)
])
my_data = spark.read.format("csv")\
.option("path", "/path/to/file.csv")\
.schema(my_schema)
.load()
my_data.count() # force reading the csv
corrupt_lines = my_data.filter("_corrupt_record is not NULL")
corrupt_lines.take(5)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With