Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to log malformed rows from Scala Spark DataFrameReader csv

The documentation for the Scala_Spark_DataFrameReader_csv suggests that spark can log the malformed rows detected while reading a .csv file.
- How can one log the malformed rows?
- Can one obtain a val or var containing the malformed rows?

The option from the linked documentation is: maxMalformedLogPerPartition (default 10): sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored

like image 360
teserecter Avatar asked Jan 27 '17 13:01

teserecter


1 Answers

Based on this databricks example you need to explicitly add the "_corrupt_record" column to a schema definition when you read in the file. Something like this worked for me in pyspark 2.4.4:

from pyspark.sql.types import *

my_schema = StructType([
  StructField("field1", StringType(), True),
  ...
  StructField("_corrupt_record", StringType(), True)
])

my_data = spark.read.format("csv")\
  .option("path", "/path/to/file.csv")\
  .schema(my_schema)
  .load()

my_data.count()  # force reading the csv

corrupt_lines = my_data.filter("_corrupt_record is not NULL")
corrupt_lines.take(5)
like image 100
klucar Avatar answered Oct 02 '22 15:10

klucar