Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: How to join RDDs by time range

I have a delicate Spark problem, where i just can't wrap my head around.

We have two RDDs ( coming from Cassandra ). RDD1 contains Actions and RDD2 contains Historic data. Both have an id on which they can be matched/joined. But the problem is the two tables have an N:N relation ship. Actions contains multiple rows with the same id and so does Historic. Here are some example date from both tables.

Actions time is actually a timestamp

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at is actually a timestamp

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

How can we join these two tables in a way, that we get a result like this

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

I can't come up with a good solution that feels right, without making a lot of iterations over huge datasets. I always have to think about making a range from the Historic set and then somehow check if the Actions fits in the range e.g (11:00 - 12:15) to make the calculation. But that seems to pretty slow to me. Is there any more efficient way to do that? Seems to me, that this kind of problem could be popular, but i couldn't find any hints on this yet. How would you solve this problem in spark?

My current attempts so far ( in half way done code )

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
like image 503
M. Hirn Avatar asked Nov 25 '14 22:11

M. Hirn


1 Answers

It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:

Given case classes for Action(id, time, x) and Historic(id, time, y)

  • Join the actions with the history (this might be heavy)
  • filter all historic data not relevant for a given action
  • key the results by (id,time) - differentiate same key at different times
  • reduce the history by action to the max value, leaving us with relevant historical record for the given action

In Spark:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

Using the data provided above, the report looks like:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(I transformed the time to seconds to have a simplistic timestamp)

like image 157
maasg Avatar answered Oct 25 '22 08:10

maasg