I have a CSV in which a field is datetime in a specific format. I cannot import it directly in my Dataframe because it needs to be a timestamp. So I import it as string and convert it into a Timestamp
like this
import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql.Row def getTimestamp(x:Any) : Timestamp = { val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") if (x.toString() == "") return null else { val d = format.parse(x.toString()); val t = new Timestamp(d.getTime()); return t } } def convert(row : Row) : Row = { val d1 = getTimestamp(row(3)) return Row(row(0),row(1),row(2),d1) }
Is there a better, more concise way to do this, with the Dataframe API or spark-sql? The above method requires the creation of an RDD and to give the schema for the Dataframe again.
PySpark to_date() – Convert String to Date Format to_date() – function is used to format string ( StringType ) to date ( DateType ) column. This function takes the first argument as a date string and the second argument takes the pattern the date is in the first argument.
Introduction to PySpark TimeStamp. PySpark TIMESTAMP is a python function that is used to convert string function to TimeStamp function. This time stamp function is a format function which is of the type MM – DD – YYYY HH :mm: ss. sss, this denotes the Month, Date, and Hour denoted by the hour, month, and seconds.
Spark >= 2.2
Since you 2.2 you can provide format string directly:
import org.apache.spark.sql.functions.to_timestamp val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") df.withColumn("ts", ts).show(2, false) // +---+-------------------+-------------------+ // |id |dts |ts | // +---+-------------------+-------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| // |2 |#$@#@# |null | // +---+-------------------+-------------------+
Spark >= 1.6, < 2.2
You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
You can use unix_timestamp
to parse strings and cast it to timestamp
import org.apache.spark.sql.functions.unix_timestamp val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") df.withColumn("ts", ts).show(2, false) // +---+-------------------+---------------------+ // |id |dts |ts | // +---+-------------------+---------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| // |2 |#$@#@# |null | // +---+-------------------+---------------------+
As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat
.
Spark >= 1.5, < 1.6
You'll have to use use something like this:
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
or
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
due to SPARK-11724.
Spark < 1.5
you should be able to use these with expr
and HiveContext
.
I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):
def getTimestamp(s: String) : Option[Timestamp] = s match { case "" => None case _ => { val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") Try(new Timestamp(format.parse(s).getTime)) match { case Success(t) => Some(t) case Failure(_) => None } } }
Please notice I assume you know Row
elements types beforehand (if you read it from a csv file, all them are String
), that's why I use a proper type like String
and not Any
(everything is subtype of Any
).
It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a None
is simply returned.
You could use it further on with:
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With