I am trying to write some test cases to validate the data between source (.csv) file and target (hive table). One of the validation is the Structure validation of the table.
I have load the .csv data (using a defined schema) into one dataframe and extracted the hive table data into another dataframe.
When I now try to compare the schema of the two dataframes, it returns false. Not sure why. Any idea on this please?
source dataframe schema:
scala> res39.printSchema
root
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
target dataframe schema:
scala> targetRawData.printSchema
root
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
When I compare, it returns false:
scala> res39.schema == targetRawData.schema
res47: Boolean = false
Data in the two dataframes is shown below:
scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen | 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen | 100| 115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25| RBS| Arun | 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun | 100| 30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12| XZX| Arun | 400| 12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+
scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen| 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen| 100| 115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25| RBS| Arun| 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun| 100| 30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+
The complete code looks like below:
//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
//val conf = new SparkConf().setAppName("Simple Application")
//val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
// set source and target location
val sourceDataLocation = "hdfs://localhost:9000/source.txt"
val targetTableName = "TableA"
// Extract source data
println("Extracting SAS source data from csv file location " + sourceDataLocation);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sourceRawCsvData = sc.textFile(sourceDataLocation)
println("Extracting target data from hive table " + targetTableName)
val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
// Add the test cases here
// Test 2 - Validate the Structure
val headerColumns = sourceRawCsvData.first().split(",").to[List]
val schema = TableASchema(headerColumns)
val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
.map(_.split(",").toList)
.map(row)
val dataFrame = spark.createDataFrame(data,schema)
val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
data.collect
data.getClass
// Test 3 - Validate the data
// Test 4 - Calculate the average and variance of Int or Dec columns
// Test 5 - Test 5
def UpdateResult(tableName: String, returnCode: Int, description: String){
val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
val a = hc.sql(insertString)
}
def TableASchema(columnName: List[String]): StructType = {
StructType(
Seq(
StructField(name = "datetime", dataType = TimestampType, nullable = true),
StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
StructField(name = "source_bank", dataType = StringType, nullable = true),
StructField(name = "emp_name", dataType = StringType, nullable = true),
StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
)
)
}
def row(line: List[String]): Row = {
Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
}
def convertToTimestamp(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
}
Based on @Derek Kaknes's answer, here's the solution I came up with for comparing schemas, being concerned only about column name, datatype & nullability and indifferent to metadata
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, StructField}
def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
df.schema.map { (structField: StructField) =>
structField.name.toLowerCase -> (structField.dataType, structField.nullable)
}.toMap
}
// Compare relevant information
def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
schema2: Map[String, (DataType, Boolean)]
): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
(schema1.keys ++ schema2.keys).
map(_.toLowerCase).
toList.distinct.
flatMap { (columnName: String) =>
val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)
if (schema1FieldOpt == schema2FieldOpt) None
else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
}.toMap
}
getCleanedSchema
method extracts information of interest - column datatype & nullability and returns a map
of column name to tuple
getSchemaDifference
method returns a map
containing only those columns that differ in the two schemas. If a column is absent in one of the two schemas, then it's corresponding properties would be None
I've just had the exact same problem. When you read data from Hive the schema's StructField
component will sometimes contain Hive metadata in the field metadata
.
You can't see it when printing the schemas because this field is not part of the toString
definition.
Here is the solution I've decided to use, I just get a copy of the schema with an empty Metadata before comparing it :
schema.map(_.copy(metadata = Metadata.empty))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With