Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to access "_corrupt_record" column in pyspark?

I am having some problem in Handling bad records and files(CSV). Here is my CSV file

+------+---+---+----+
|  Name| ID|int|int2|
+------+---+---+----+
| Sohel|  1|  4|  33|
| Sohel|  2|  5|  56|
| Sohel|  3|  6| 576|
| Sohel|  a|  7| 567|
|Sohel2|  c|  7| 567|
+------+---+---+----+

I am reading this file with predefine schema

schema = StructType([
  StructField("Name",StringType(),True),
  StructField("ID",IntegerType(),True),
  StructField("int",IntegerType(),True),
  StructField("int2",IntegerType(),True),
  StructField("_corrupt_record", StringType(),True) 
  ])
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema, 
columnNameOfCorruptRecord='_corrupt_record')

And the result is

+------+----+---+----+---------------+
|  Name|  ID|int|int2|_corrupt_record|
+------+----+---+----+---------------+
| Sohel|   1|  4|  33|           null|
| Sohel|   2|  5|  56|           null|
| Sohel|   3|  6| 576|           null|
| Sohel|null|  7| 567|  Sohel,a,7,567|
|Sohel2|null|  7| 567| Sohel2,c,7,567|
+------+----+---+----+---------------+

its give me the result as I expected, But the problem start from here I just want to access those "_corrupt_record " and make a new df. I did filter in df for just "_corrupt_record" but its seems like original CSV file did not have "_corrupt_record" column thats why its giving me error.

badRows = df.filter("_corrupt_record is Not Null").show()

error massage

Error while reading file dbfs:/tmp/test_file/test_csv.csv.
Caused by: java.lang.IllegalArgumentException: _corrupt_record does not exist. Available: Name, ID, int, int2

I am flowing Databricks document, https://docs.databricks.com/data/data-sources/read-csv.html#read-files ,But they also had same error then why they even add it on document!!

All I just want to access " _corrupt_record" column and make new DF. Any help or suggestion will be thankful.

like image 483
Sohel Reza Avatar asked Jan 19 '26 21:01

Sohel Reza


1 Answers

you need to cache the DF beforehand to use the _corrupt_record. Please refer: Not able to retain the corrupted rows in pyspark using PERMISSIVE mode

like image 100
SR77 Avatar answered Jan 21 '26 10:01

SR77