Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does one control the flow of an Akka Stream based on another stream

Tags:

akka-stream

Say that I have two sources:

val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])

I'd like to create a Graph[...] processing step in Akka Stream that based on the current value of the ticks streams it consumes as much as possible in the values stream. So for instance, when values match I want to return all the elements that match in the second source, otherwise keep ticking resulting in an output like:

(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)

How would you implement this behaviour?

like image 471
tonicebrian Avatar asked Nov 23 '16 14:11

tonicebrian


People also ask

How does Akka stream work?

By default, Akka Streams will fuse the stream operators. This means that the processing steps of a flow or stream can be executed within the same Actor and has two consequences: passing elements from one operator to the next is a lot faster between fused operators due to avoiding the asynchronous messaging overhead.

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


1 Answers

I'd recommend you take a look at the Akka Stream Documentation on this subject: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

According to the site, you can implement a GraphStage like this:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

override def shape = FlowShape(in, out)
}

There is also a blog post on this subject: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

Hope this helps :)

like image 60
GJZ Avatar answered Oct 11 '22 16:10

GJZ