Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark apply custom schema to a DataFrame

I have a data in Parquet file and want to apply custom schema to it.

My initial data within Parquet is as below,

root
 |-- CUST_ID: decimal(9,0) (nullable = true)
 |-- INACTV_DT: string (nullable = true)
 |-- UPDT_DT: string (nullable = true)
 |-- ACTV_DT: string (nullable = true)
 |-- PMT_AMT: decimal(9,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = true)

My custom schema is below,

root
 |-- CUST_ID: decimal(38,0) (nullable = false)
 |-- INACTV_DT: timestamp (nullable = false)
 |-- UPDT_DT: timestamp (nullable = false)
 |-- ACTV_DT: timestamp (nullable = true)
 |-- PMT_AMT: decimal(19,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = false)

Below is my code to apply new data-frame to it

val customSchema = getOracleDBSchema(sparkSession, QUERY).schema
val DF_frmOldParkquet = sqlContext_par.read.parquet("src/main/resources/data_0_0_0.parquet")
val rows: RDD[Row] = DF_frmOldParkquet.rdd
val newDataFrame = sparkSession.sqlContext.createDataFrame(rows, tblSchema)
newDataFrame.printSchema()
newDataFrame.show()

I am getting below error, when I perform this operation.

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of timestamp
staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(38,0), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, CUST_ID), DecimalType(38,0)), true) AS CUST_ID#27
like image 961
rajcool111 Avatar asked Oct 17 '25 13:10

rajcool111


1 Answers

There are two main applications of schema in Spark SQL

  • schema argument passed to schema method of the DataFrameReader which is used to transform data in some formats (primarily plain text files). In this case schema can be used to automatically cast input records.
  • schema argument passed to createDataFrame (variants which take RDD or List of Rows) of the SparkSession. In this case schema has to conform to the data, and is not used for casting.

None of the above is applicable in your case:

  • Input is strongly typed, therefore schema, if present, is ignored by the reader.

  • Schema doesn't match the data, therefore it cannot be used to createDataFrame.

In this scenario you should cast each column to the desired type. Assuming the types are compatible, something like this should work

val newDataFrame = df.schema.fields.foldLeft(df){ 
  (df, s) => df.withColumn(s.name, df(s.name).cast(s.dataType))     
}

Depending on the format of the data this might be sufficient or not. For example if fields that should be transformed to timestamps don't use standard formatting, casting won't work, and you'll have to use Spark datetime processing utilities.

like image 186
zero323 Avatar answered Oct 20 '25 03:10

zero323