What’s a performant way to do fuzzy joins in PySpark?
I am looking for the community's views on a scalable approach to joining large Spark DataFrames on a nearest key condition. Allow me to illustrate this problem by means of a representative example. Suppose we have the following Spark DataFrame containing events occurring at some point in time:
ddf_event = spark.createDataFrame(
data=[
[1, 'A'],
[5, 'A'],
[10, 'B'],
[15, 'A'],
[20, 'B'],
[25, 'B'],
[30, 'A']
],
schema=['ts_event', 'event']
)
and the following Spark DataFrame containing GPS data measured at some point in time:
ddf_gps = spark.createDataFrame(
data=[
[2, '(-46.84635, 173.13674)'],
[4, '(2.50362, 104.34136)'],
[8, '(-24.20741, 51.80755)'],
[15, '(-59.07798, -20.49141)'],
[18, '(-44.34468, -167.90401)'],
[24, '(-18.84175, 16.68628)'],
[27, '(20.48501,58.42423)']
],
schema=['ts_gps', 'gps_coordinates']
)
which we would like to join to produce the following resulting DataFrame:
+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates |
+--------+-----+------+-----------------------+
|1 |A |2 |(-46.84635, 173.13674) |
|5 |A |4 |(2.50362, 104.34136) |
|10 |B |8 |(-24.20741, 51.80755) |
|15 |A |15 |(-59.07798, -20.49141) |
|20 |B |18 |(-44.34468, -167.90401)|
|25 |B |24 |(-18.84175, 16.68628) |
|30 |A |27 |(20.48501,58.42423) |
+--------+-----+------+-----------------------+
effectively finding the nearest GPS data point given the event timestamp and GPS data timestamp.
We thus run into the problem of joining on a nearest key condition, 'nearest' in this case being defined as the smallest absolute difference between timestamps.
I've explored two approaches to achieve this: one based on a filtered binned join (FBJ) and one based on a filtered sorted union (FSU). Both approaches are described below in more detail.
The FBJ approach depends on the parameter bin_size
, which limits the time window in which a matching GPS timestamp may be found. Increasing the bin_size
increases the computational load, decreasing it decreases the outcome quality.
Both approaches do not appear to scale linearly with the size of the input DataFrames.
In practice I have to deal with input data consisting of tens of millions of rows, therefore I am currently lost for a viable solution to the problem.
The FBJ approach consists of the following steps:
ts_bin
column, binning the timestamp
columns, implemented by:bin_size = 10
ddf_event = ddf_event.withColumn(
'ts_bin',
F.round(F.col('ts_event') / bin_size)
)
ddf_gps = ddf_gps.withColumn(
'ts_bin',
F.round(F.col('ts_gps') / bin_size)
)
ts_bin
column, implemented by:ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
from pyspark.sql.window import Window
window = Window.partitionBy('ts_event')
ddf = ddf.withColumn(
'ts_diff',
F.abs(F.col('ts_gps') - F.col('ts_event'))
)
ddf = ddf.withColumn(
'min_ts_diff',
F.min(F.col('ts_diff')).over(window)
)
ddf = (
ddf
.where(
(F.col('ts_diff') == F.col('min_ts_diff')) |
(F.col('ts_diff').isNull())
)
.select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)
Limit bin_size
situations:
bin_size >> 1
effectively results in a full cross-joinbin_size = 1
effectively results in a left-join on ts_event == ts_gps
The FSU approach consists of the following steps:
def union(df1, df2):
cols = list(set(df1.columns).union(set(df2.columns)))
for col in cols:
if col not in df1.columns:
df1 = df1.withColumn(col, F.lit(None))
if col not in df2.columns:
df2 = df2.withColumn(col, F.lit(None))
return df1.select(cols).union(df2.select(cols))
ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
from sys import maxsize
last_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(0, maxsize)
ddf = (
ddf.withColumn(
'prev_time',
F.last(F.col('ts_gps'), ignorenulls=True)
.over(last_window)
).withColumn(
'prev_coordinates',
F.last(F.col('gps_coordinates'), ignorenulls=True)
.over(last_window)
).withColumn(
'next_time',
F.first(F.col('ts_gps'), ignorenulls=True)
.over(first_window)
).withColumn(
'next_coordinates',
F.first(F.col('gps_coordinates'), ignorenulls=True)
.over(first_window)
)
)
condition = (F.col('timestamp') - F.col('prev_time')
< F.col('next_time') - F.col('timestamp'))
ddf = (
ddf
.where(F.col('event').isNotNull())
.withColumn(
'ts_gps',
F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
).withColumn(
'gps_coordinates',
F.when(condition | F.col('next_time').isNull(),
F.col('prev_coordinates'))
.otherwise(F.col('next_coordinates'))
).select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)
PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER , LEFT OUTER , RIGHT OUTER , LEFT ANTI , LEFT SEMI , CROSS , SELF JOIN.
If on is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an inner equi-join. how – str, default 'inner'. One of inner, outer, left_outer, right_outer, semijoin. So you need to use the "condition as a list" option like in the last example.
Broadcast Join Working Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.
What you are looking for is a temporal join. Check out the time series Spark library Flint (formerly HuoHua, Spark in Chinese): https://github.com/twosigma/flint
Using this library, for 2 given Time Series DataFrames (the documentation explains these objects), you can perform in PySpark (or Scala Spark):
ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")
Your timestamps were not clear, so set tolerance according to your needs. You can also do 'future joins' if needed.
Check out their Spark Summit presentation for more explanation and examples: https://youtu.be/g8o5-2lLcvQ
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With