Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Better way to convert a string field into timestamp in Spark

Tags:

I have a CSV in which a field is datetime in a specific format. I cannot import it directly in my Dataframe because it needs to be a timestamp. So I import it as string and convert it into a Timestamp like this

import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql.Row  def getTimestamp(x:Any) : Timestamp = {     val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")     if (x.toString() == "")      return null     else {         val d = format.parse(x.toString());         val t = new Timestamp(d.getTime());         return t     } }  def convert(row : Row) : Row = {     val d1 = getTimestamp(row(3))     return Row(row(0),row(1),row(2),d1) } 

Is there a better, more concise way to do this, with the Dataframe API or spark-sql? The above method requires the creation of an RDD and to give the schema for the Dataframe again.

like image 372
user568109 Avatar asked Apr 24 '15 09:04

user568109


People also ask

How do I convert a string to a date in spark?

PySpark to_date() – Convert String to Date Format to_date() – function is used to format string ( StringType ) to date ( DateType ) column. This function takes the first argument as a date string and the second argument takes the pattern the date is in the first argument.

How do you use timestamp in PySpark?

Introduction to PySpark TimeStamp. PySpark TIMESTAMP is a python function that is used to convert string function to TimeStamp function. This time stamp function is a format function which is of the type MM – DD – YYYY HH :mm: ss. sss, this denotes the Month, Date, and Hour denoted by the hour, month, and seconds.


2 Answers

Spark >= 2.2

Since you 2.2 you can provide format string directly:

import org.apache.spark.sql.functions.to_timestamp  val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")  df.withColumn("ts", ts).show(2, false)  // +---+-------------------+-------------------+ // |id |dts                |ts                 | // +---+-------------------+-------------------+ // |1  |05/26/2016 01:01:01|2016-05-26 01:01:01| // |2  |#$@#@#             |null               | // +---+-------------------+-------------------+ 

Spark >= 1.6, < 2.2

You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts") 

You can use unix_timestamp to parse strings and cast it to timestamp

import org.apache.spark.sql.functions.unix_timestamp  val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")  df.withColumn("ts", ts).show(2, false)  // +---+-------------------+---------------------+ // |id |dts                |ts                   | // +---+-------------------+---------------------+ // |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0| // |2  |#$@#@#             |null                 | // +---+-------------------+---------------------+ 

As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat.

Spark >= 1.5, < 1.6

You'll have to use use something like this:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

or

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

due to SPARK-11724.

Spark < 1.5

you should be able to use these with expr and HiveContext.

like image 186
zero323 Avatar answered Oct 04 '22 03:10

zero323


I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):

def getTimestamp(s: String) : Option[Timestamp] = s match {   case "" => None   case _ => {     val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")     Try(new Timestamp(format.parse(s).getTime)) match {       case Success(t) => Some(t)       case Failure(_) => None     }       } } 

Please notice I assume you know Row elements types beforehand (if you read it from a csv file, all them are String), that's why I use a proper type like String and not Any (everything is subtype of Any).

It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a None is simply returned.

You could use it further on with:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 
like image 26
jarandaf Avatar answered Oct 04 '22 02:10

jarandaf