Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compose two Flows side-by-side?

Is there an Akka streams combinator for doing the following (or something to that effect)? (Let's call it and for now.)

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat]

The semantics would be that whatever the source, its elements will be passed to both Flows, and their outputs will be combined into a new Flow as a tuple. (For those familiar with arrows from category theory flavoured functional programming, I am looking for something like &&&.)

There are two combinators in the library that looked relevant, namely zip and alsoTo. But the former accepts a SourceShape, and the latter, a SinkShape. Neither would admit a GraphShape. Why is this the case?

My use case is something like following:

someSource
  .via(someFlowThatReturnsUnit.and(Flow.apply))
  .runWith(someSink)

Failing to find something like .and, I modified my original Flow like this:

someSource
  .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs)
  .runWith(someSink)

This works, but I am looking for a cleaner, more compositional solution.

like image 323
missingfaktor Avatar asked Nov 18 '17 15:11

missingfaktor


1 Answers

Notice

As Viktor Klang noted in the comments: zipping into a Tuple2[O,O2] is only viable when it is known that both flows, flow1 & flow2, are 1:1 with respect to input element count and output element count.

Graph Based Solution

A tuple construct can be created inside of a Graph. In fact, your question almost perfectly matches the introductory example:

enter image description here

Extending the sample code in the link, you can use Broadcast and Zip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))

  val merge = builder.add(Zip[Int, Int]()) //different than link

  val f1, f2, f4 = Flow[Int].map(_ + 10)

  val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
              bcast ~> f4 ~> merge
  ClosedShape
})//end RunnableGraph.fromGraph

Somewhat Hacky Stream Solution

If you are looking for a pure stream solution, it is possible using intermediate streams but the Mat would not be maintained and it involves materialization of 2 streams for each input element:

def andFlows[I, O, O2] (maxConcurrentSreams : Int)
                       (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                       (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[O]  = Source
                           .single(i)
                           .via(flow1)
                           .to(Sink.head[O])
                           .run()

    val o2 : Future[O2] = Source
                           .single(i)
                           .via(flow2)
                           .to(Sink.head[O2])
                           .run()

    o zip o2
  }//end Flow[I].mapAsync

Generic Zipping

If you want to make this zipping generic, for most Flows, then the output type will have to be (Seq[O], Seq[O2]). This type can be generated by using Sink.seq instead of Sink.head in the above andFlows function:

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
                              (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                              (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[Seq[O]]  = Source
                                .single(i)
                                .via(flow1)
                                .to(Sink.seq[O])
                                .run()

    val o2 : Future[Seq[O2]] = Source
                                .single(i)
                                .via(flow2)
                                .to(Sink.seq[O2])
                                .run()

    o zip o2
  }//end Flow[I].mapAsync
like image 173
Ramón J Romero y Vigil Avatar answered Nov 03 '22 02:11

Ramón J Romero y Vigil