I'm playing with Akka Streams, and have figured out most of the basics, but I'm not clear on how to take the results of a Merge
and do further operations (map, filter, fold, etc) on it.
I'd like to modify the following code so that, instead of piping the merge to a sink, that I could instead manipulate the data further.
implicit val materializer = FlowMaterializer()
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> sink
}.run()
I guess my primary problem is that I can't figure out how to make a flow component that doesn't have a source, and I can't figure out how to do a merge without using the special Merge object and the ~>
syntax.
EDIT: This question and answer was for and worked with Akka Streams 0.11
If you don't care about the semantic of Merge
where elements go downstream randomly then you could just try concat
on Source
instead like so:
items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)
The difference here is that all items from a
will flow downstream first before any elements of b
. If you really need the behavior of Merge
, then you could consider something like the following (keep in mind that you will eventually need a sink, but you certainly can do additional transforms after the merging):
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> transform
}.run
In this example, you can see that I use the helper from the Flow
companion to create a Flow
without a specific input Source
. From there I can then attach that to the merge point to get my additional processing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With