Say that I have two sources:
val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])
I'd like to create a Graph[...]
processing step in Akka Stream that based on the current value of the ticks
streams it consumes as much as possible in the values stream. So for instance, when values match I want to return all the elements that match in the second source, otherwise keep ticking resulting in an output like:
(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)
How would you implement this behaviour?
By default, Akka Streams will fuse the stream operators. This means that the processing steps of a flow or stream can be executed within the same Actor and has two consequences: passing elements from one operator to the next is a lot faster between fused operators due to avoiding the asynchronous messaging overhead.
Akka streams consist of three major components in it – Source, Flow and Sink.
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
I'd recommend you take a look at the Akka Stream Documentation on this subject: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html
According to the site, you can implement a GraphStage like this:
final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape(in, out)
}
There is also a blog post on this subject: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/
Hope this helps :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With