I have the case when a sink
(or an intermediate flow
) can actually produce some side-effect data that has to be pushed back (or appended) to the Source
. Is there a way to accomplish this using stream DSL? I could use some blocking queue or sort of it to create a source
and then push data directly to that queue, however this is something that breaks the abstractions of streams. Perhaps there's a better solution I don't know about?
As Viktor said, you can use circular graphs.
For example, the partition
stage allows you to select particular elements of your stream.
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
In this example, the source in
is connected to one input of the merge
. The integers then pass through the partition
stage which will separate even and odd.
The even are going through the addOne
flow and then into the second input of the merge
(which will get them back to the partition
stage again).
The odd are directly going to the sink out
.
This allows to feed some of the values back into the graph, but it can easily lead to a cycle (this is why the addOne
stage is important here, without it the even numbers would have been trapped in the graph).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With