Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining two spark dataframes on time (TimestampType) in python

I have two dataframes and I would like to join them based on one column, with a caveat that this column is a timestamp, and that timestamp has to be within a certain offset (5 seconds) in order to join records. More specifically, a record in dates_df with date=1/3/2015:00:00:00 should be joined with events_df with time=1/3/2015:00:00:01 because both timestamps are within 5 seconds from each other.

I'm trying to get this logic working with python spark, and it is extremely painful. How do people do joins like this in spark?

My approach is to add two extra columns to dates_df that will determine the lower_timestamp and upper_timestamp bounds with a 5 second offset, and perform a conditional join. And this is where it fails, more specifically:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()

Captures only the last part of the query:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....

and it gives me a wrong result.

Do I really have to do a full blown cartesian join for each inequality, removing duplicates as I go along?

Here is the full code:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()

I get the following output:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
like image 967
Oleksiy Avatar asked Jun 03 '15 20:06

Oleksiy


1 Answers

I did spark SQL query with explain() to see how it is done, and replicated the same behavior in python. First here is how to do the same with SQL spark:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()

This works, but the question was about how to do it in python, so the solution seems to be just a plain join, followed by two filters:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)

joined_df.explain() yields the same query as sql spark results.explain() so I assume this is how things are done.

like image 97
Oleksiy Avatar answered Nov 03 '22 00:11

Oleksiy