Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you create merge_asof functionality in PySpark?

Table A has many columns with a date column, Table B has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A is small, table B is massive.

I need to join B to A under the condition that a given element a of A.datetime corresponds to

B[B['datetime'] <= a]]['datetime'].max()

There are a couple ways to do this, but I would like the most efficient way.

Option 1

Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof.

Option 2

Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition

B['datetime'] <= A['datetime']

Then eliminate all the superfluous rows.

Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.

EDIT: Here is the sample input and expected output:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+
like image 856
Collin Cunningham Avatar asked Aug 09 '19 19:08

Collin Cunningham


People also ask

How do I add a list to a DataFrame in PySpark?

To do this first create a list of data and a list of column names. Then pass this zipped data to spark. createDataFrame() method. This method is used to create DataFrame.

How does Rlike work in PySpark?

rlike() Usagerlike() function can be used to derive a new Spark/PySpark DataFrame column from an existing column, filter data by matching it with regular expressions, use with conditions, and many more.

Does PySpark use PyArrow?

If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.


1 Answers

I doubt that it is faster, but you could solve it with Spark by using union and last together with a window function.

from pyspark.sql import functions as f
from pyspark.sql.window import Window

df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))

df3 = df1.unionByName(df2)

w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()

Which gives

+-------+----------+---+
|Column1|  Datetime|Key|
+-------+----------+---+
|      A|2019-02-03|  2|
|      B|2019-03-14|  4|
+-------+----------+---+

It's an old question but maybe still useful for somebody.

like image 105
ScootCork Avatar answered Oct 19 '22 13:10

ScootCork