Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Timestamp roundtrip from Spark Python to Pandas and back

How do you do a roundtrip conversion of timestamp data from Spark Python to Pandas and back? I read data from a Hive table in Spark, want to do some calculations in Pandas, and write the results back to Hive. Only the last part is failing, converting a Pandas timestamp back to a Spark DataFrame timestamp.

import datetime
import pandas as pd

dates = [
    ('today', '2017-03-03 11:30:00')
  , ('tomorrow', '2017-03-04 08:00:00')
  , ('next Thursday', '2017-03-09 20:00:00')
]
string_date_rdd = sc.parallelize(dates)
timestamp_date_rdd = string_date_rdd.map(lambda t: (t[0], datetime.datetime.strptime(t[1], "%Y-%m-%d %H:%M:%S')))
timestamp_df = sqlContext.createDataFrame(timestamp_date_rdd, ['Day', 'Date'])
timestamp_pandas_df = timestamp_df.toPandas()
roundtrip_df = sqlContext.createDataFrame(timestamp_pandas_df)
roundtrip_df.printSchema()
roundtrip_df.show()

root
 |-- Day: string (nullable = true)
 |-- Date: long (nullable = true)

+-------------+-------------------+
|          Day|               Date|
+-------------+-------------------+
|        today|1488540600000000000|
|     tomorrow|1488614400000000000|
|next Thursday|1489089600000000000|
+-------------+-------------------+

At this point the roundtrip Spark DataFrame has the date column as datatype long. In Pyspark this can be converted back to a datetime object easily, e.g., datetime.datetime.fromtimestamp(148908960000000000 / 1000000000), although the time of day is off by a few hours. How do I do this to convert the data type of the Spark DataFrame?

Python 3.4.5, Spark 1.6.0

Thanks, John

like image 928
John Todd Avatar asked Mar 03 '17 16:03

John Todd


People also ask

Can you convert a Spark DataFrame to a pandas DataFrame?

Convert PySpark Dataframe to Pandas DataFramePySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

Why does toPandas take so long?

Looking at the source code for toPandas() , one reason it may be slow is because it first creates the pandas DataFrame , and then copies each of the Series in that DataFrame over to the returned DataFrame .

Is Spark DataFrame faster than pandas?

If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.

How do you use PyArrow in PySpark?

If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.


1 Answers

Here's one solution I found:

from pyspark.sql.types import TimestampType
extra_column_df = roundtrip_df.select(roundtrip_df.Day, roundtrip_df.Date).withColumn('new_date', roundtrip_df.Date / 1000000000)
roundtrip_timestamp_df = extra_column_df.select(extra_column_df.Day, extra_column_df.new_date.cast(TimestampType()).alias('Date')

Outputs:

root
 |-- Day: string (nullable = true)
 |-- Date: timestamp (nullable = true)

+-------------+--------------------+
|        Day  |                Date|
+-------------+--------------------+
|        today|2017-03-03 11:30:...|
|     tomorrow|2017-03-04 08:00:...|
|next Thursday|2017-03-09 20:00:...|
+-------------+--------------------+

As an additional bug or feature, this seems to convert all the dates to UTC, including DST awareness.

like image 165
John Todd Avatar answered Nov 15 '22 09:11

John Todd