Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Validate contents of Spark Dataframe

I have below Scala Spark code base, which works well, but should not.

The 2nd column has data of mixed types, whereas in Schema I have defined it of IntegerType. My actual program has over 100 columns, and keep deriving multiple child DataFrames after transformations.

How can I validate that contents of RDD or DataFrame fields have correct datatype values, and thus ignore invalid rows or change contents of column to some default value. Any more pointers for data quality checks with DataFrame or RDD are appreciated.

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  
like image 846
sshroff Avatar asked Oct 21 '15 23:10

sshroff


People also ask

How do I enforce a schema in spark DataFrame?

We can create a DataFrame programmatically using the following three steps. Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.


1 Answers

First of all passing schema is simply a way to avoid type inference. It is not validated or enforced during DataFrame creation. On a side note I wouldn't describe ClassCastException as working well. For a moment I thought you actually found a bug.

I think the important question is how you get data like theSeq / newRdd in the first place. Is it something you parse by yourself, is it received from an external component? Simply looking at the type (Seq[(String, Any)] / RDD[(String, Any)] respectively) you already know it is not a valid input for a DataFrame. Probably the way to handle things at this level is to embrace static typing. Scala provides quite a few neat ways to handle unexpected conditions (Try, Either, Option) where the last one is the simplest one, and as a bonus works well with Spark SQL. Rather simplistic way to handle things could look like this

def validateInt(x: Any) = x match {
  case x: Int => Some(x)
  case _ => None
}

def validateString(x: Any) = x match { 
  case x: String => Some(x)
  case _ => None
}

val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
  case (id, age) => (validateString(id), validateInt(age))}

Since Options can be easily composed you can add additional checks like this:

def validateAge(age: Int) = {
  if(age >= 0 && age < 150) Some(age)
  else None
}

val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
  case (id, age) => (id, age.flatMap(validateAge))}

Next instead of Row which is a very crude container I would use cases classes:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}

At this moment all you have to do is call toDF:

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

This was the hard but arguably a more elegant way. A faster is to let SQL casting system to do a job for you. First lets convert everything to Strings:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))

Next create a DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
  case (c, t) => col(c).cast(t).alias(c)}

val dfProcessed: DataFrame = df.select(exprs: _*)

And the result:

dfProcessed.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+
like image 158
zero323 Avatar answered Oct 29 '22 01:10

zero323