I have the DataFrame df
with some data that is the result of the calculation process. Then I store this DataFrame in the database for further usage.
For example:
val rowsRDD: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val1", DoubleType, true)) .add(StructField("val2", DoubleType, true)) val df = spark.createDataFrame(rowsRDD, schema)
I would need to check that all columns in the final DataFrame correspond to specific data types. Of course, one way is to create a DataFrame using schema (as an above example). However, in some cases the changes can be occasionally introduced to the data types during the calculation process - after the initial DataFrame was created (for example, when some formula applied to DataFrame was changed).
Therefore, I want to double-check that the final DataFrame corresponds to the initial schema. If it does not correspond, then I would like to apply the corresponding casting. Is there any way to do it?
To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields .
You can get the schema of a dataframe with the schema method df.schema // Or `df.printSchema` if you want to print it nicely on the standard output Define a castColumn method def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame =
Note that initially the values under the ‘Prices’ column were stored as strings by placing quotes around those values. You can now check the data type of all columns in the DataFrame by adding df.dtypes to the code:
However, in some cases the changes can be occasionally introduced to the data types during the calculation process - after the initial DataFrame was created (for example, when some formula applied to DataFrame was changed). Therefore, I want to double-check that the final DataFrame corresponds to the initial schema.
Create a Schema using DataFrame directly by reading the data from text file. Given Data − Look at the following data of a file named employee.txt placed in the current respective directory where the spark shell point is running.
You can get the schema of a dataframe with the schema method
df.schema // Or `df.printSchema` if you want to print it nicely on the standard output
Define a castColumn method
def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame = df.withColumn(colName, df.col(colName).cast(randomDataType))
Then apply this method to all the columns you need to cast.
First, get an Array of tuples with the colName and the targeted dataType
//Assume your dataframes have the same column names, you need to sortBy in case the it is not in the same order // You can also iterate through dfOrigin.schema only and compare their dataTypes with target dataTypes instead of zipping val differences = (dfOrigin.schema.fields.sortBy{case (x: StructField) => x.name} zip dfTarget.schema.fields.sortBy{case (x: StructField) => x.name}).collect { case (origin: StructField, target: StructField) if origin.dataType != target.dataType => (origin.name, target.dataType) }
Then
differences.foldLeft(df) { case (acc, value) => castColumn(acc, value._1, value._2) }
Based on Untyped Dataset Operations from https://spark.apache.org/docs/2.2.0/sql-programming-guide.html, it should be:
df.printSchema()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With