I have several Flow
s in my program, that I would like to process in parallel. After all are completed, I would like to trigger some action.
One way of doing it would be to send a message to an Actor after each completion, and when the Actor verifies that all flows are ready, then it can trigger the action.
I was wondering if there was anything within the akka-streams Scala DSL that I may be overlooking that would make it even simpler.
EDIT: Converting a Flow to a future would not work because, as the documentation mentions, the Future is completed right after the first event that happens in the stream. Running the following code:
implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())
def main(args: Array[String]): Unit = {
val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)
fut.onComplete{ _ =>
println("future completed")
}
}
Prints "tick", followed by "future completed", and then an infinite sequence of "tick"s.
As mentioned in the comment, I believe @Eduardo is right about converting the Flow
to a Future
. Consider this example:
implicit val system = ActorSystem("Sys")
import system.dispatcher
val text1 =
"""hello1world
foobar""".stripMargin
val text2 =
"""this1is
a1test""".stripMargin
def flowFut(text:String) = Flow(text.split("\\s").toVector)
.map(_.toUpperCase())
.map(_.replace("1", ""))
.toFuture(FlowMaterializer(MaterializerSettings()))
val fut1 = flowFut(text1)
val fut2 = flowFut(text2)
val fut3 = for{
f1 <- fut1
f2 <- fut2
} yield {
s"$f1, $f2"
}
fut3 foreach {println(_)}
Here, I run two separate transforms on each set of text lines, converting to upper and removing the #1 from any text. I then force the result of this Flow
to a Future
so I can compose the results into a new Future
which I then print out.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With