I have two dataframes and I would like to join them based on one column, with a caveat that this column is a timestamp, and that timestamp has to be within a certain offset (5 seconds) in order to join records. More specifically, a record in dates_df
with date=1/3/2015:00:00:00
should be joined with events_df
with time=1/3/2015:00:00:01
because both timestamps are within 5 seconds from each other.
I'm trying to get this logic working with python spark, and it is extremely painful. How do people do joins like this in spark?
My approach is to add two extra columns to dates_df
that will determine the lower_timestamp
and upper_timestamp
bounds with a 5 second offset, and perform a conditional join. And this is where it fails, more specifically:
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.explain()
Captures only the last part of the query:
Filter (time#6 < upper_timestamp#4)
CartesianProduct
....
and it gives me a wrong result.
Do I really have to do a full blown cartesian join for each inequality, removing duplicates as I go along?
Here is the full code:
from datetime import datetime, timedelta
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
master = 'local[*]'
app_name = 'stackoverflow_join'
conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def lower_range_func(x, offset=5):
return x - timedelta(seconds=offset)
def upper_range_func(x, offset=5):
return x + timedelta(seconds=offset)
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())
dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)
dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)
dates_df.show()
# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
withColumn('upper_timestamp', upper_range(dates_df['date']))
event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)
events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)
events_df.show()
# finally, join the data
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.show()
I get the following output:
+-----+--------------------+
| name| date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+
+--------------------+-------+
| time| event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+
+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name| date| lower_timestamp| upper_timestamp| time| event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
I did spark SQL query with explain()
to see how it is done, and replicated the same behavior in python. First here is how to do the same with SQL spark:
dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and events.time < dates.upper_timestamp")
results.explain()
This works, but the question was about how to do it in python, so the solution seems to be just a plain join, followed by two filters:
joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)
joined_df.explain()
yields the same query as sql spark results.explain()
so I assume this is how things are done.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With