Table A
has many columns with a date column, Table B
has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A
is small, table B
is massive.
I need to join B
to A
under the condition that a given element a
of A.datetime
corresponds to
B[B['datetime'] <= a]]['datetime'].max()
There are a couple ways to do this, but I would like the most efficient way.
Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof
.
Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition
B['datetime'] <= A['datetime']
Then eliminate all the superfluous rows.
Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.
EDIT: Here is the sample input and expected output:
A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+
To do this first create a list of data and a list of column names. Then pass this zipped data to spark. createDataFrame() method. This method is used to create DataFrame.
rlike() Usagerlike() function can be used to derive a new Spark/PySpark DataFrame column from an existing column, filter data by matching it with regular expressions, use with conditions, and many more.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.
I doubt that it is faster, but you could solve it with Spark by using union
and last
together with a window
function.
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))
df3 = df1.unionByName(df2)
w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()
Which gives
+-------+----------+---+
|Column1| Datetime|Key|
+-------+----------+---+
| A|2019-02-03| 2|
| B|2019-03-14| 4|
+-------+----------+---+
It's an old question but maybe still useful for somebody.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With