I encountered the problem while trying to use Spark for simple reading a CSV file. After such operation I would like to ensure that:
That's the code I use and have problems with:
val schema = Encoders.product[T].schema
val df = spark.read
.schema(schema)
.option("header", "true")
.csv(fileName)
The type T
is of type Product
, i. e. case class. This works but it doesn't check if column names are correct, so I can give another file and as long as the data types are correct the error doesn't occur and I am unaware that the user provided the wrong file but by some coincidence with correct data types with the proper ordering.
I tried to use options which infers the schema and then use .as[T]
method on Dataset, but in case when any column other than String
contains only null it's interpreted by Spark as String
column, but in my schema it is Integer
. So cast exception occurs, but column names has been checked all right.
To summarize: I found solution that I can ensure correct data types but no headers and other solution that I can validate headers but have problems with data types. Is there any way to achieve both, i. e. complete validation of headers and types?
I am using Spark 2.2.0.
Spark has built in support to read CSV file. We can use spark read command to it will read CSV data and return us DataFrame. We can use read CSV function and passed path to our CSV file. Spark will read this file and return us a data frame. There are other generic ways to read CSV file as well. You can use either of method to read CSV file.
So that it can correctly identify data types for each column. Though in most cases Spark identifies column data types correctly, in production workloads it is recommended to pass our custom schema while reading file. We can do that using Spark’s “StructType” and “StructFiled” functions.
PySpark PySpark provides csv ("path") on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv ("path") to save or write to the CSV file.
However, to the contrary, in big data technologies like HDFS, Data Lake etc. you can load the file without a schema and can read it directly into compute engines like Spark for processing. For instance, let’s walk through a code snippet Azure Data bricks (Spark). We have the file InjuryRecord_withoutdate.csv loaded in Databricks File System
Looks like you'll have to do it yourself by reading the file header twice.
Looking at Spark's code, the inferred header is completely ignored (never actually read) if a user supplies their own schema, so there's no way of making Spark fail on such an inconsistency.
To perform this comparison yourself:
val schema = Encoders.product[T].schema
// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file
val fileSchema = spark.read
.option("header", "true")
.csv("test.csv").schema
// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
.option("header", "true")
.csv("test.csv")
// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
.zip(schema.fields.map(_.name))
.filter { case (actual, expected) => actual != expected }
// fail if any inconsistency found:
assert(badColumnNames.isEmpty,
s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With