Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I pipe the output of an Akka Streams Merge to another Flow?

Tags:

stream

scala

akka

I'm playing with Akka Streams, and have figured out most of the basics, but I'm not clear on how to take the results of a Merge and do further operations (map, filter, fold, etc) on it.

I'd like to modify the following code so that, instead of piping the merge to a sink, that I could instead manipulate the data further.

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()

I guess my primary problem is that I can't figure out how to make a flow component that doesn't have a source, and I can't figure out how to do a merge without using the special Merge object and the ~> syntax.

EDIT: This question and answer was for and worked with Akka Streams 0.11

like image 359
Rich Henry Avatar asked Dec 25 '22 01:12

Rich Henry


1 Answers

If you don't care about the semantic of Merge where elements go downstream randomly then you could just try concat on Source instead like so:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

The difference here is that all items from a will flow downstream first before any elements of b. If you really need the behavior of Merge, then you could consider something like the following (keep in mind that you will eventually need a sink, but you certainly can do additional transforms after the merging):

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)


val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run

In this example, you can see that I use the helper from the Flow companion to create a Flow without a specific input Source. From there I can then attach that to the merge point to get my additional processing.

like image 170
cmbaxter Avatar answered May 25 '23 01:05

cmbaxter