Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining Spark DataFrames on a nearest key condition

What’s a performant way to do fuzzy joins in PySpark?

I am looking for the community's views on a scalable approach to joining large Spark DataFrames on a nearest key condition. Allow me to illustrate this problem by means of a representative example. Suppose we have the following Spark DataFrame containing events occurring at some point in time:

ddf_event = spark.createDataFrame(
    data=[
        [1, 'A'],
        [5, 'A'],
        [10, 'B'],
        [15, 'A'],
        [20, 'B'],
        [25, 'B'],
        [30, 'A']
    ],
    schema=['ts_event', 'event']
)

and the following Spark DataFrame containing GPS data measured at some point in time:

ddf_gps = spark.createDataFrame(
    data=[
        [2, '(-46.84635, 173.13674)'],
        [4, '(2.50362, 104.34136)'],
        [8, '(-24.20741, 51.80755)'],
        [15, '(-59.07798, -20.49141)'],
        [18, '(-44.34468, -167.90401)'],
        [24, '(-18.84175, 16.68628)'],
        [27, '(20.48501,58.42423)']
    ],
    schema=['ts_gps', 'gps_coordinates']
)

which we would like to join to produce the following resulting DataFrame:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates        |
+--------+-----+------+-----------------------+
|1       |A    |2     |(-46.84635, 173.13674) |
|5       |A    |4     |(2.50362, 104.34136)   |
|10      |B    |8     |(-24.20741, 51.80755)  |
|15      |A    |15    |(-59.07798, -20.49141) |
|20      |B    |18    |(-44.34468, -167.90401)|
|25      |B    |24    |(-18.84175, 16.68628)  |
|30      |A    |27    |(20.48501,58.42423)    |
+--------+-----+------+-----------------------+

effectively finding the nearest GPS data point given the event timestamp and GPS data timestamp.

We thus run into the problem of joining on a nearest key condition, 'nearest' in this case being defined as the smallest absolute difference between timestamps.

I've explored two approaches to achieve this: one based on a filtered binned join (FBJ) and one based on a filtered sorted union (FSU). Both approaches are described below in more detail.

The FBJ approach depends on the parameter bin_size, which limits the time window in which a matching GPS timestamp may be found. Increasing the bin_size increases the computational load, decreasing it decreases the outcome quality.

Both approaches do not appear to scale linearly with the size of the input DataFrames.

In practice I have to deal with input data consisting of tens of millions of rows, therefore I am currently lost for a viable solution to the problem.

FBJ approach

The FBJ approach consists of the following steps:

  1. Create a ts_bin column, binning the timestamp columns, implemented by:
bin_size = 10
ddf_event = ddf_event.withColumn(
    'ts_bin',
    F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
    'ts_bin',
    F.round(F.col('ts_gps') / bin_size)
)
  1. Join the DataFrames on the ts_bin column, implemented by:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
  1. Determine the minimum timestamp difference, implemented by:
from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
    'ts_diff',
    F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
    'min_ts_diff',
    F.min(F.col('ts_diff')).over(window)
)
  1. Filter and select the relevant rows and columns, implemented by:
ddf = (
    ddf
    .where(
        (F.col('ts_diff') == F.col('min_ts_diff')) |
        (F.col('ts_diff').isNull())   
    )
    .select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

Limit bin_size situations:

  • bin_size >> 1 effectively results in a full cross-join
  • bin_size = 1 effectively results in a left-join on ts_event == ts_gps

FSU approach

The FSU approach consists of the following steps:

  1. Union the DataFrames, implemented by:
def union(df1, df2):
    cols = list(set(df1.columns).union(set(df2.columns)))
    for col in cols:
        if col not in df1.columns:
            df1 = df1.withColumn(col, F.lit(None))
        if col not in df2.columns:
            df2 = df2.withColumn(col, F.lit(None))
    return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
  1. Sort the resulting DataFrame and get the adjecent GPS timestamps, implemented by:
from sys import maxsize

last_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
    ddf.withColumn(
        'prev_time',
        F.last(F.col('ts_gps'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'prev_coordinates',
        F.last(F.col('gps_coordinates'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'next_time',
        F.first(F.col('ts_gps'), ignorenulls=True)
         .over(first_window)
    ).withColumn(
        'next_coordinates',
        F.first(F.col('gps_coordinates'), ignorenulls=True)
         .over(first_window)
    )
)
  1. Filter and select the relevant rows and columns, implemented by:
condition = (F.col('timestamp') - F.col('prev_time')
             < F.col('next_time') - F.col('timestamp'))

ddf = (
    ddf
    .where(F.col('event').isNotNull())
    .withColumn(
        'ts_gps',
        F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
    ).withColumn(
        'gps_coordinates',
        F.when(condition | F.col('next_time').isNull(),
               F.col('prev_coordinates'))
         .otherwise(F.col('next_coordinates'))
    ).select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)
like image 922
Wouter Hordijk Avatar asked Sep 24 '19 09:09

Wouter Hordijk


People also ask

How do I join two data frames in Spark?

PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER , LEFT OUTER , RIGHT OUTER , LEFT ANTI , LEFT SEMI , CROSS , SELF JOIN.

How do you join with conditions in PySpark?

If on is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an inner equi-join. how – str, default 'inner'. One of inner, outer, left_outer, right_outer, semijoin. So you need to use the "condition as a list" option like in the last example.

Which join is faster in PySpark?

Broadcast Join Working Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.


1 Answers

What you are looking for is a temporal join. Check out the time series Spark library Flint (formerly HuoHua, Spark in Chinese): https://github.com/twosigma/flint

Using this library, for 2 given Time Series DataFrames (the documentation explains these objects), you can perform in PySpark (or Scala Spark):

ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")

Your timestamps were not clear, so set tolerance according to your needs. You can also do 'future joins' if needed.

Check out their Spark Summit presentation for more explanation and examples: https://youtu.be/g8o5-2lLcvQ

like image 117
DaReal Avatar answered Oct 20 '22 01:10

DaReal