Is there an Akka streams combinator for doing the following (or something to that effect)? (Let's call it and
for now.)
(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat]
The semantics would be that whatever the source, its elements will be passed to both Flow
s, and their outputs will be combined into a new Flow
as a tuple. (For those familiar with arrows from category theory flavoured functional programming, I am looking for something like &&&
.)
There are two combinators in the library that looked relevant, namely zip
and alsoTo
. But the former accepts a SourceShape
, and the latter, a SinkShape
. Neither would admit a GraphShape
. Why is this the case?
My use case is something like following:
someSource
.via(someFlowThatReturnsUnit.and(Flow.apply))
.runWith(someSink)
Failing to find something like .and
, I modified my original Flow
like this:
someSource
.via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs)
.runWith(someSink)
This works, but I am looking for a cleaner, more compositional solution.
Notice
As Viktor Klang noted in the comments: zipping into a Tuple2[O,O2]
is only viable when it is known that both flows, flow1
& flow2
, are 1:1 with respect to input element count and output element count.
Graph Based Solution
A tuple construct can be created inside of a Graph. In fact, your question almost perfectly matches the introductory example:
Extending the sample code in the link, you can use Broadcast and Zip
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int]()) //different than link
val f1, f2, f4 = Flow[Int].map(_ + 10)
val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})//end RunnableGraph.fromGraph
Somewhat Hacky Stream Solution
If you are looking for a pure stream solution, it is possible using intermediate streams but the Mat
would not be maintained and it involves materialization of 2 streams for each input element:
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[O] = Source
.single(i)
.via(flow1)
.to(Sink.head[O])
.run()
val o2 : Future[O2] = Source
.single(i)
.via(flow2)
.to(Sink.head[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
Generic Zipping
If you want to make this zipping generic, for most Flows, then the output type will have to be (Seq[O], Seq[O2])
. This type can be generated by using Sink.seq
instead of Sink.head
in the above andFlows
function:
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[Seq[O]] = Source
.single(i)
.via(flow1)
.to(Sink.seq[O])
.run()
val o2 : Future[Seq[O2]] = Source
.single(i)
.via(flow2)
.to(Sink.seq[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With