Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka stream - connect sink to source?

I have the case when a sink (or an intermediate flow) can actually produce some side-effect data that has to be pushed back (or appended) to the Source. Is there a way to accomplish this using stream DSL? I could use some blocking queue or sort of it to create a source and then push data directly to that queue, however this is something that breaks the abstractions of streams. Perhaps there's a better solution I don't know about?

like image 641
jdevelop Avatar asked Mar 11 '23 05:03

jdevelop


1 Answers

As Viktor said, you can use circular graphs.

For example, the partition stage allows you to select particular elements of your stream.

  def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1

  val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val in = Source(1 to 10)
    val out = Sink.foreach[Int](println)

    val addOne = Flow[Int].map(_ + 1)

    val partition = builder.add(Partition[Int](2, partitionFunction))
    val merge = builder.add(Merge[Int](2))

                            in ~> merge ~> partition
    partition.out(0) ~> addOne ~> merge
    partition.out(1) ~> out

    ClosedShape
  })

In this example, the source in is connected to one input of the merge. The integers then pass through the partition stage which will separate even and odd.

The even are going through the addOne flow and then into the second input of the merge (which will get them back to the partition stage again).

The odd are directly going to the sink out.

This allows to feed some of the values back into the graph, but it can easily lead to a cycle (this is why the addOne stage is important here, without it the even numbers would have been trapped in the graph).

like image 176
Jacob Avatar answered Apr 08 '23 20:04

Jacob