We are working with spark 1.6
and we are trying to keep global identity for similar events. There can be few "groups"of events with identical ID (in the example as number. letters are added just for uniqueness). And we know that some of these events are similar so we are able to connect them. We want to keep something like:
Z -> 1, 2, 3
X -> 4
so in a future if some events with id 4 will come we can assign X
as a global identity.
Please check example for better illustration:
Let's say we have some streaming data coming into spark job.
1a
1b
2c
2d
2e
3f
3g
3h
4i
Since event 1 is our first appearance we want to assign 1 to Z
.
Next what we know is that 1b and 2c are similar. so we want to keep somewhere 2->1
mapping. Same thing is for 2e and 3f so we need mapping 3-2
. So for now we have 3 pairs 1->Z
, 2->1
, 3->2
.
And we want to create "historical" path: Z <- 1 <- 2 <- 3
At the end we will have all events with ID = Z
.
1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X
We tried to use mapwithstate
but only thing we were able to do was that 2->1
and 3->2
. With mapwithstate
we were not able to get state for "parent" in state for current event - eg. current event 3 with parent 2 and not able to get 2 -> 1
and neither 1 -> Z
.
Is it possible to have some global mapping for this? We already tried accumulators and broadcast but looks like not very suitable. And we were not able to replace events 1 for first mapping and events 2 for second mapping with Z
.
If new event 5
will come and it is similar with 3h for example we need to assign mapping 5->Z
again.
What follows is a solution for the given problem, using a mutable reference to a 'state' RDD that we update with new results each time.
We use transform
to tag the incoming event stream with the unique global id by doing a similarity join. This is a join "by hand" where we use a product of the two datasets and compare each entry pair-wise.
Note that this is an expensive process. There are many parts that could be changed, depending on specific characteristics of the expected stream. For example, we could replace the global state RDD by a local map
and apply map-side
joins for a faster similarity join, but that very much depends on the expected cardinality of the set of unique ids.
This was trickier than I originally expected. Take it only as a starting point towards a more robust solution. For example, the union
operation on the state RDD needs regular checkpointing to avoid the DAG to grow beyond control.
(There's a lot of room for improvement - but that's beyond a reasonable effort to provide an answer.)
Here I sketch the core of the solution, for the complete test notebook see UniqueGlobalStateChains.snb
// this mutable reference points to the `states` that we keep across interations
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD
// we assume an incoming Event stream. Here we prepare it for the global id-process
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()
// this is the core of the solution.
// We transform the incoming events into tagged events.
// As a by-product, the mutable `states` reference will get updated with the latest state mapping.
// the "chain" of events can be reconstructed ordering the states by timestamp
@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) =>
val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}
val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}
val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events
val similarityJoinMap = newEventIds.cartesian(currentMappings)
.collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
.collectAsMap
//val similarityBC = sparkContext.broadcast(similarityJoinMap)
val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids
val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) =>
events.map(event => (event.id,event.payload, globalKey))
}
val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
currentState = newStates
states.unpersist(false)
states = newStates.union(states)
states.cache()
newTaggedEvents
}
Given this input sequence:
"1|a,1|b,3|c", "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"
We get:
Tagged Events with a global id:
---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819
And we can reconstruct the chain of events that are derived from a global id:
gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6
-o-
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With