Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming groupByKey and updateStateByKey implementation

I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to "sessionize" the web traffic similar to this blog post

The only difference is that I want to "sessionize" each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context.

Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or

(IP, (requestPage, time, time)).

I then call groupByKey() on this K/V pair. In batch mode, this would produce a:

(String, CollectionBuffer((String, Long, Long), ...) or

(IP, CollectionBuffer((requestPage, time, time), ...)

In a StreamingContext, it produces a:

(String, ArrayBuffer((String, Long, Long), ...) like so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

However, as the next microbatch (DStream) arrives, this information is discarded.

Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to "sessionize" the page time.

I believe the operator to make that happen is "updateStateByKey." I'm having some trouble with this operator (I'm new to both Spark & Scala);

any help is appreciated.

Thus far:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }
like image 968
plamb Avatar asked Dec 17 '14 22:12

plamb


2 Answers

I think you are looking for something like this:

 def updateGroupByKey(
                          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
     }
  }
like image 101
Gábor Bakos Avatar answered Sep 28 '22 16:09

Gábor Bakos


Gabor's answer got me started down the right path, but here is an answer that produces the expected output.

First, for the output I want:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))

I don't need groupByKey(). updateStateByKey already accumulates the values into a Seq, so the addition of groupByKey is unnecessary (and expensive). Spark users strongly suggest not using groupByKey.

Here is the code that worked:

def updateValues( newValues: Seq[(String, Long, Long)],
                      currentValue: Option[Seq[ (String, Long, Long)]]
                      ): Option[Seq[(String, Long, Long)]] = {

  Some(currentValue.getOrElse(Seq.empty) ++ newValues)

  }


val grouped = ipTimeStamp.updateStateByKey(updateValues)

Here updateStateByKey is passed a function (updateValues) that has the accumulation of values over time (newValues) as well as an option for the current value in the stream (currentValue). It then returns the combination of these.getOrElse is required as currentValue may occasionally be empty. Credit to https://twitter.com/granturing for the correct code.

like image 31
plamb Avatar answered Sep 28 '22 17:09

plamb