Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark, Scala - column type determine

I can load data from database, and I do some process with this data. The problem is some table has date column as 'String', but some others trait it as 'timestamp'.

I cannot know what type of date column is until loading data.

> x.getAs[String]("date") // could be error when date column is timestamp type
> x.getAs[Timestamp]("date") // could be error when date column is string type

This is how I load data from spark.

spark.read
              .format("jdbc")
              .option("url", url)
              .option("dbtable", table)
              .option("user", user)
              .option("password", password)
              .load()

Is there any way to trait them together? or convert it as string always?

like image 372
J.Done Avatar asked Dec 27 '16 08:12

J.Done


People also ask

How do I find the DataType of a column in Spark Scala?

In Spark you can get all DataFrame column names and types (DataType) by using df. dttypes and df. schema where df is an object of DataFrame.

How do I change the DataType of a column in a DataFrame in Scala?

To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.

How do I check if a column is numeric in Spark?

Unfortunately, Spark doesn't have isNumeric() function hence you need to use existing functions to check if the string column has all or any numeric values. You may be tempted to write a Spark UDF for scenarios like this but it is not recommended to use UDF's as they do not perform well.

How to get column names and types in spark dataframe?

In Spark you can get all DataFrame column names and types (DataType) by using df.dttypes and df.schema where df is an object of DataFrame. Let’s see some examples of how to get data type and column name of all columns and data type of selected column by name using Scala examples. Related: Convert Column Data Type in Spark DataFrame 1.

How to check column data type is integer or string in spark?

Spark Check Column Data Type is Integer or String 1 Check Data Type of DataFrame Column. To check the column type of a DataFrame specific column use df.schema which returns all column names and types, now get the column type ... 2 Select All Column Names of String Type. ... 3 Select All Column Names of Integer Type. ... 4 Conclusion. ...

How to replace a column with a specific value in spark?

Refer to Spark Convert DataFrame Column Data Type Some times you may want to replace all string type columns with a specific value, for example, replace an empty string with a null value in Spark, in order to do so you can use df.schema.fields to get all DataFrame columns and apply a filter to get only string columns.

What is numeric data type in spark?

Numeric data types. Represents a JVM object that is passing through Spark SQL expression evaluation. The data type representing Short values. The data type representing Short values. Please use the singleton DataTypes.ShortType . The data type representing String values. The data type representing String values.


2 Answers

You can pattern-match on the type of the column (using the DataFrame's schema) to decide whether to parse the String into a Timestamp or just use the Timestamp as is - and use the unix_timestamp function to do the actual conversion:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
  ("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
  ("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")

// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
  df.schema("date").dataType match {
    case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
    case _ => df
  }
}

// after "normalizing", you can assume date has Timestamp type - 
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
like image 102
Tzach Zohar Avatar answered Oct 03 '22 17:10

Tzach Zohar


Here are a few things you can try:

(1) Start utilizing the inferSchema function during load if you have a version that supports it. This will have spark figure the data type of columns, this doesn't work in all scenarios. Also look at the input data, if you have quotes I advise adding an extra argument to account for them during the load.

val inputDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load(fileLocation)

(2) To identify the data type of a column you can use the below code, it will place all of the column name and data types into their own Arrays of Strings.

val columnNames : Array[String] = inputDF.columns
val columnDataTypes : Array[String] = inputDF.schema.fields.map(x=>x.dataType).map(x=>x.toString)
like image 42
afeldman Avatar answered Oct 03 '22 17:10

afeldman