Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark dataframe join with range slow

I have the following input data (in Parquet) for a spark job:

Person (millions of rows)
+---------+----------+---------------+---------------+
|  name   | location |     start     |      end      |
+---------+----------+---------------+---------------+
| Person1 |     1230 | 1478630000001 | 1478630000010 |
| Person2 |     1230 | 1478630000002 | 1478630000012 |
| Person2 |     1230 | 1478630000013 | 1478630000020 |
| Person3 |     3450 | 1478630000001 | 1478630000015 |
+---------+----------+---------------+---------------+


Event (millions of rows)
+----------+----------+---------------+
|  event   | location |  start_time   |
+----------+----------+---------------+
| Biking   |     1230 | 1478630000005 |
| Skating  |     1230 | 1478630000014 |
| Baseball |     3450 | 1478630000015 |
+----------+----------+---------------+

and I need to to transform it into the following expected outcome:

[{
    "name" : "Biking",
    "persons" : ["Person1", "Person2"]
},
{
    "name" : "Skating",
    "persons" : ["Person2"]
},
{
    "name" : "Baseball",
    "persons" : ["Person3"]
}]

In words: the result is a list of each event each with a list of the persons which participated in this event.

A person counts as participant if

Person.start < Event.start_time 
&& Person.end > Event.start_time
&& Person.location == Event.location

I have tried different approaches, but the only one which actually seems to work is to join the two dataframes and then group/aggregate them by event. But the join is extremely slow and does not distribute well across multiple CPU cores.

Current code for the Join:

final DataFrame fullFrame = persons.as("persons")
    .join(events.as("events"), col("persons.location").equalTo(col("events.location"))
               .and(col("events.start_time").geq(col("persons.start")))
               .and(col("events.start_time").leq(col("persons.end"))), "inner");

//count to have an action 
fullFrame.count();

I am using Spark Standalone and Java, if this makes a difference.

Does anybody have a better idea how to solve this problem with Spark 1.6.2 ?

like image 202
Schäbo Avatar asked Nov 08 '22 06:11

Schäbo


1 Answers

Range joins are performed as a crossproduct with a subsequent filter step. A potentially better solution could be, to broadcast the potentially smaller events table and then map the persons table: inside the map, check for the join condition and produce the respective result.

like image 78
Elmar Macek Avatar answered Nov 14 '22 22:11

Elmar Macek