Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL - reading csv with schema

I encountered the problem while trying to use Spark for simple reading a CSV file. After such operation I would like to ensure that:

  • the data types are correct (with using provided schema)
  • the headers are correct against provided schema

That's the code I use and have problems with:

val schema = Encoders.product[T].schema
val df = spark.read
 .schema(schema)
 .option("header", "true")
 .csv(fileName)

The type T is of type Product, i. e. case class. This works but it doesn't check if column names are correct, so I can give another file and as long as the data types are correct the error doesn't occur and I am unaware that the user provided the wrong file but by some coincidence with correct data types with the proper ordering.

I tried to use options which infers the schema and then use .as[T] method on Dataset, but in case when any column other than String contains only null it's interpreted by Spark as String column, but in my schema it is Integer. So cast exception occurs, but column names has been checked all right.

To summarize: I found solution that I can ensure correct data types but no headers and other solution that I can validate headers but have problems with data types. Is there any way to achieve both, i. e. complete validation of headers and types?

I am using Spark 2.2.0.

like image 881
Hooberd Avatar asked Oct 13 '17 08:10

Hooberd


People also ask

How to read CSV file in spark?

Spark has built in support to read CSV file. We can use spark read command to it will read CSV data and return us DataFrame. We can use read CSV function and passed path to our CSV file. Spark will read this file and return us a data frame. There are other generic ways to read CSV file as well. You can use either of method to read CSV file.

How to pass custom schema to spark while reading a file?

So that it can correctly identify data types for each column. Though in most cases Spark identifies column data types correctly, in production workloads it is recommended to pass our custom schema while reading file. We can do that using Spark’s “StructType” and “StructFiled” functions.

How to read CSV file in pyspark dataframereader?

PySpark PySpark provides csv ("path") on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv ("path") to save or write to the CSV file.

Can I load a CSV file without a schema?

However, to the contrary, in big data technologies like HDFS, Data Lake etc. you can load the file without a schema and can read it directly into compute engines like Spark for processing. For instance, let’s walk through a code snippet Azure Data bricks (Spark). We have the file InjuryRecord_withoutdate.csv loaded in Databricks File System


1 Answers

Looks like you'll have to do it yourself by reading the file header twice.

Looking at Spark's code, the inferred header is completely ignored (never actually read) if a user supplies their own schema, so there's no way of making Spark fail on such an inconsistency.

To perform this comparison yourself:

val schema = Encoders.product[T].schema

// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file 
val fileSchema = spark.read
  .option("header", "true")
  .csv("test.csv").schema

// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
  .option("header", "true")
  .csv("test.csv")

// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
  .zip(schema.fields.map(_.name))
  .filter { case (actual, expected) => actual != expected }

// fail if any inconsistency found:
assert(badColumnNames.isEmpty, 
  s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
like image 145
Tzach Zohar Avatar answered Sep 28 '22 09:09

Tzach Zohar