How do you do a roundtrip conversion of timestamp data from Spark Python to Pandas and back? I read data from a Hive table in Spark, want to do some calculations in Pandas, and write the results back to Hive. Only the last part is failing, converting a Pandas timestamp back to a Spark DataFrame timestamp.
import datetime
import pandas as pd
dates = [
('today', '2017-03-03 11:30:00')
, ('tomorrow', '2017-03-04 08:00:00')
, ('next Thursday', '2017-03-09 20:00:00')
]
string_date_rdd = sc.parallelize(dates)
timestamp_date_rdd = string_date_rdd.map(lambda t: (t[0], datetime.datetime.strptime(t[1], "%Y-%m-%d %H:%M:%S')))
timestamp_df = sqlContext.createDataFrame(timestamp_date_rdd, ['Day', 'Date'])
timestamp_pandas_df = timestamp_df.toPandas()
roundtrip_df = sqlContext.createDataFrame(timestamp_pandas_df)
roundtrip_df.printSchema()
roundtrip_df.show()
root
|-- Day: string (nullable = true)
|-- Date: long (nullable = true)
+-------------+-------------------+
| Day| Date|
+-------------+-------------------+
| today|1488540600000000000|
| tomorrow|1488614400000000000|
|next Thursday|1489089600000000000|
+-------------+-------------------+
At this point the roundtrip Spark DataFrame has the date column as datatype long. In Pyspark this can be converted back to a datetime object easily, e.g., datetime.datetime.fromtimestamp(148908960000000000 / 1000000000), although the time of day is off by a few hours. How do I do this to convert the data type of the Spark DataFrame?
Python 3.4.5, Spark 1.6.0
Thanks, John
Convert PySpark Dataframe to Pandas DataFramePySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.
Looking at the source code for toPandas() , one reason it may be slow is because it first creates the pandas DataFrame , and then copies each of the Series in that DataFrame over to the returned DataFrame .
If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.
Here's one solution I found:
from pyspark.sql.types import TimestampType
extra_column_df = roundtrip_df.select(roundtrip_df.Day, roundtrip_df.Date).withColumn('new_date', roundtrip_df.Date / 1000000000)
roundtrip_timestamp_df = extra_column_df.select(extra_column_df.Day, extra_column_df.new_date.cast(TimestampType()).alias('Date')
Outputs:
root
|-- Day: string (nullable = true)
|-- Date: timestamp (nullable = true)
+-------------+--------------------+
| Day | Date|
+-------------+--------------------+
| today|2017-03-03 11:30:...|
| tomorrow|2017-03-04 08:00:...|
|next Thursday|2017-03-09 20:00:...|
+-------------+--------------------+
As an additional bug or feature, this seems to convert all the dates to UTC, including DST awareness.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With