Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to drop malformed rows while reading csv with schema Spark?

While I am using Spark DataSet to load a csv file. I prefer designating schema clearly. But I find there are a few rows not compliant with my schema. A column should be double, but some rows are non-numeric values. Is it possible to filter all rows that are not compliant with my schema from DataSet easily?

val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")

f.csv:

a
1.0

I prefer "a" can be filtered from my DataSet easily. Thanks!

like image 551
HouZhe Avatar asked Apr 09 '18 02:04

HouZhe


People also ask

What is Dropmalformed in Spark?

DROPMALFORMED allows drops or discards the corrupt records during the creation of the dataframe. FAILFAST throws an error if any corrupted data is present in the data file. Build Log Analytics Application with Spark Streaming and Kafka.


2 Answers

If you are reading a CSV file and want to drop the rows that do not match the schema. You can do this by adding the option mode as DROPMALFORMED

Input data

a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1

Schema

val schema = StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", DoubleType, false)
))

Reading a csv file with schema and option as

  val df = spark.read.schema(schema)
    .option("mode", "DROPMALFORMED")
    .csv("/path to csv file ")

Output:

+-----+-----+
|key  |value|
+-----+-----+
|hello|1.0  |
|hi   |2.2  |
|how  |3.1  |
|you  |4.5  |
+-----+-----+

You can get more details on spark-csv here

Hope this helps!

like image 168
koiralo Avatar answered Oct 06 '22 23:10

koiralo


.option("mode", "DROPMALFORMED") should do the work.

mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing.

  • PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a new field configured by columnNameOfCorruptRecord. When a schema is set by user, it sets null for extra fields.

  • DROPMALFORMED : ignores the whole corrupted records.

  • FAILFAST : throws an exception when it meets corrupted records.

like image 24
Amit Kulkarni Avatar answered Oct 07 '22 01:10

Amit Kulkarni