I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to "sessionize" the web traffic similar to this blog post
The only difference is that I want to "sessionize" each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context.
Log files are read from Kafka and parsed into K/V
pairs of (String, (String, Long, Long))
or
(IP, (requestPage, time, time))
.
I then call groupByKey()
on this K/V pair
. In batch mode, this would produce a:
(String, CollectionBuffer((String, Long, Long), ...)
or
(IP, CollectionBuffer((requestPage, time, time), ...)
In a StreamingContext, it produces a:
(String, ArrayBuffer((String, Long, Long), ...)
like so:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
However, as the next microbatch (DStream) arrives, this information is discarded.
Ultimately what I want is for that ArrayBuffer
to fill up over time as a given IP continues to interact and to run some computations on its data to "sessionize" the page time.
I believe the operator to make that happen is "updateStateByKey
." I'm having some trouble with this operator (I'm new to both Spark & Scala);
any help is appreciated.
Thus far:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}
I think you are looking for something like this:
def updateGroupByKey(
newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
//Collect the values
val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
//Convert state to buffer
if (buffs2.isEmpty) None else {
val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
}
}
Gabor's answer got me started down the right path, but here is an answer that produces the expected output.
First, for the output I want:
(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))
I don't need groupByKey()
. updateStateByKey
already accumulates the values into a Seq, so the addition of groupByKey
is unnecessary (and expensive). Spark users strongly suggest not using groupByKey
.
Here is the code that worked:
def updateValues( newValues: Seq[(String, Long, Long)],
currentValue: Option[Seq[ (String, Long, Long)]]
): Option[Seq[(String, Long, Long)]] = {
Some(currentValue.getOrElse(Seq.empty) ++ newValues)
}
val grouped = ipTimeStamp.updateStateByKey(updateValues)
Here updateStateByKey
is passed a function (updateValues) that has the accumulation of values over time (newValues) as well as an option for the current value in the stream (currentValue). It then returns the combination of these.getOrElse
is required as currentValue may occasionally be empty. Credit to https://twitter.com/granturing for the correct code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With