I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value)
. If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map
. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers)
.
I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers
. I'd appreciate any pointers to the right direction.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
Maybe something like statefulMapConcat
can help you:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
//state with already processed ids
var ids = Set.empty[Int]
identValue => if (ids.contains(identValue.id)) {
//save value to DB
println(identValue.value)
List(identValue)
} else {
//save both to database
println(identValue)
ids = ids + identValue.id
List(identValue)
}
}
Source(identValues)
.via(stateFlow)
.runWith(Sink.seq)
.onSuccess { case identValue => println(identValue) }
A few years later, here is an implementation I wrote if you only need a 1-to-1 mapping (not 1-to-N):
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
object StatefulMap {
def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}
class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
val in = Inlet[T]("StatefulMap.in")
val out = Outlet[O]("StatefulMap.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val f = converter
setHandler(in, () => push(out, f(grab(in))))
setHandler(out, () => pull(in))
}
}
Test (and demo):
behavior of "StatefulMap"
class Counter extends (Any => Int) {
var count = 0
override def apply(x: Any): Int = {
count += 1
count
}
}
it should "not share state among substreams" in {
val result = await {
Source(0 until 10)
.groupBy(2, _ % 2)
.via(StatefulMap(new Counter()))
.fold(Seq.empty[Int])(_ :+ _)
.mergeSubstreams
.runWith(Sink.seq)
}
result.foreach(_ should be(1 to 5))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With