Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Streams: State in a flow

I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value). If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers).

I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers. I'd appreciate any pointers to the right direction.

like image 520
vicaba Avatar asked Jun 18 '16 22:06

vicaba


People also ask

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

What is Materializer Akka?

Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.

Can you describe what are 3 main components of Akka streams?

Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.


2 Answers

Maybe something like statefulMapConcat can help you:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) {
    //save value to DB
    println(identValue.value)
    List(identValue)
  } else {
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  }
}

Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess { case identValue => println(identValue) }
like image 60
rincewind Avatar answered Oct 05 '22 08:10

rincewind


A few years later, here is an implementation I wrote if you only need a 1-to-1 mapping (not 1-to-N):

import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

object StatefulMap {
  def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}

class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
  val in = Inlet[T]("StatefulMap.in")
  val out = Outlet[O]("StatefulMap.out")
  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    val f = converter
    setHandler(in, () => push(out, f(grab(in))))
    setHandler(out, () => pull(in))
  }
}

Test (and demo):

  behavior of "StatefulMap"

  class Counter extends (Any => Int) {
    var count = 0

    override def apply(x: Any): Int = {
      count += 1
      count
    }
  }

  it should "not share state among substreams" in {
    val result = await {
      Source(0 until 10)
        .groupBy(2, _ % 2)
        .via(StatefulMap(new Counter()))
        .fold(Seq.empty[Int])(_ :+ _)
        .mergeSubstreams
        .runWith(Sink.seq)
    }
    result.foreach(_ should be(1 to 5))
  }
like image 29
ig-dev Avatar answered Oct 05 '22 06:10

ig-dev