Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

find the closest time between two tables in spark

I am using pyspark and I have two dataframes like this:

user         time          bus
 A    2016/07/18 12:00:00   1
 B    2016/07/19 12:00:00   2
 C    2016/07/20 12:00:00   3

bus          time          stop
 1    2016/07/18 11:59:40   sA
 1    2016/07/18 11:59:50   sB
 1    2016/07/18 12:00:05   sC
 2    2016/07/19 11:59:40   sB
 2    2016/07/19 12:00:10   sC
 3    2016/07/20 11:59:55   sD
 3    2016/07/20 12:00:10   sE

Now I want to know at which stop the user reports according to the bus number and the closest time in the second table.

For example, in table 1, user A reports at 2016/07/18 12:00:00 and he is on bus No.1, and according to the second table, there are three records of bus No.1, but the closest time is 2016/07/18 12:00:05(the third record), so the user is in sC now.

The desired output should be like this:

user         time          bus  stop
 A    2016/07/18 12:00:00   1    sC
 B    2016/07/19 12:00:00   2    sC
 C    2016/07/20 12:00:00   3    sD

I have transferred the time into timestamp so that the only problem is to find the closest timestamp where the bus number is eqaul.

Because I'm not familiar with sql right now, I tried to use map function to find the closest time and its stop, which means I have to use sqlContext.sql in the map function, and spark dosen't seem to allow this:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

So how can I write a sql query to get the right output?

like image 615
Finn Avatar asked Jul 27 '16 21:07

Finn


1 Answers

This can be done using window functions.

from pyspark.sql.window import Window
from pyspark.sql import Row, functions as W

def tm(str):
    return datetime.strptime(str, "%Y/%m/%d %H:%M:%S")

#setup data
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ]
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2))
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3))

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ]
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB"))
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC"))
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB"))
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC"))
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD"))
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE"))

#create RDD
userDf = sc.parallelize(userTime).toDF().alias("usertime")
busDf = sc.parallelize(busTime).toDF().alias("bustime")

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
    userDf.user,
    userDf.time.alias("user_time"),
    busDf.bus,
    busDf.time.alias("bus_time"),
    busDf.stop)

additional_cols = joinedDF.withColumn("bus_time_diff",  abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time"))))

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff") ).alias("rank") ).filter(col("rank") == 1)


additional_cols.show(20,False)
partDf.show(20,False)

Output:

+----+---------------------+---+---------------------+----+-------------+
|user|user_time            |bus|bus_time             |stop|bus_time_diff|
+----+---------------------+---+---------------------+----+-------------+
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:40.0|sA  |20           |
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 11:59:50.0|sB  |10           |
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 11:59:40.0|sB  |20           |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 12:00:10.0|sE  |10           |
+----+---------------------+---+---------------------+----+-------------+
+----+---------------------+---+---------------------+----+-------------+----+
|user|user_time            |bus|bus_time             |stop|bus_time_diff|rank|
+----+---------------------+---+---------------------+----+-------------+----+
|A   |2016-07-18 12:00:00.0|1  |2016-07-18 12:00:05.0|sC  |5            |1   |
|B   |2016-07-19 12:00:00.0|2  |2016-07-19 12:00:10.0|sC  |10           |1   |
|C   |2016-07-20 12:00:00.0|3  |2016-07-20 11:59:55.0|sD  |5            |1   |
+----+---------------------+---+---------------------+----+-------------+----+
like image 153
Ravi Avatar answered Oct 02 '22 17:10

Ravi