I can load data from database, and I do some process with this data. The problem is some table has date column as 'String', but some others trait it as 'timestamp'.
I cannot know what type of date column is until loading data.
> x.getAs[String]("date") // could be error when date column is timestamp type
> x.getAs[Timestamp]("date") // could be error when date column is string type
This is how I load data from spark.
spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
Is there any way to trait them together? or convert it as string always?
In Spark you can get all DataFrame column names and types (DataType) by using df. dttypes and df. schema where df is an object of DataFrame.
To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.
Unfortunately, Spark doesn't have isNumeric() function hence you need to use existing functions to check if the string column has all or any numeric values. You may be tempted to write a Spark UDF for scenarios like this but it is not recommended to use UDF's as they do not perform well.
In Spark you can get all DataFrame column names and types (DataType) by using df.dttypes and df.schema where df is an object of DataFrame. Let’s see some examples of how to get data type and column name of all columns and data type of selected column by name using Scala examples. Related: Convert Column Data Type in Spark DataFrame 1.
Spark Check Column Data Type is Integer or String 1 Check Data Type of DataFrame Column. To check the column type of a DataFrame specific column use df.schema which returns all column names and types, now get the column type ... 2 Select All Column Names of String Type. ... 3 Select All Column Names of Integer Type. ... 4 Conclusion. ...
Refer to Spark Convert DataFrame Column Data Type Some times you may want to replace all string type columns with a specific value, for example, replace an empty string with a null value in Spark, in order to do so you can use df.schema.fields to get all DataFrame columns and apply a filter to get only string columns.
Numeric data types. Represents a JVM object that is passing through Spark SQL expression evaluation. The data type representing Short values. The data type representing Short values. Please use the singleton DataTypes.ShortType . The data type representing String values. The data type representing String values.
You can pattern-match on the type of the column (using the DataFrame's schema) to decide whether to parse the String into a Timestamp or just use the Timestamp as is - and use the unix_timestamp
function to do the actual conversion:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")
// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
df.schema("date").dataType match {
case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
case _ => df
}
}
// after "normalizing", you can assume date has Timestamp type -
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
Here are a few things you can try:
(1) Start utilizing the inferSchema function during load if you have a version that supports it. This will have spark figure the data type of columns, this doesn't work in all scenarios. Also look at the input data, if you have quotes I advise adding an extra argument to account for them during the load.
val inputDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load(fileLocation)
(2) To identify the data type of a column you can use the below code, it will place all of the column name and data types into their own Arrays of Strings.
val columnNames : Array[String] = inputDF.columns
val columnDataTypes : Array[String] = inputDF.schema.fields.map(x=>x.dataType).map(x=>x.toString)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With