Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elegant way of reusing akka-stream flows

I am looking for a way to easily reuse akka-stream flows.

I treat the Flow I intend to reuse as a function, so I would like to keep its signature like:

Flow[Input, Output, NotUsed]

Now when I use this flow I would like to be able to 'call' this flow and keep the result aside for further processing.

So I want to start with Flow emiting [Input], apply my flow, and proceed with Flow emitting [(Input, Output)].

example:

val s: Source[Int, NotUsed] = Source(1 to 10)

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)

val via: Source[(Int, String), NotUsed] = ???

Now this is not possible in a straightforward way because combining flow with .via() would give me Flow emitting just [Output]

val via: Source[String, NotUsed] = s.via(stringIfEven)

Alternative is to make my reusable flow emit [(Input, Output)] but this requires every flow to push its input through all the stages and make my code look bad.

So I came up with a combiner like this:

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val broadcast = b.add(Broadcast[In](2))
  val zip = b.add(Zip[In, Out])

  broadcast.out(0) ~> zip.in0
  broadcast.out(1) ~> flow ~> zip.in1

  FlowShape(broadcast.in, zip.out)
})

}

that is broadcasting the input to the flow and as well in a parallel line directly -> both to the 'Zip' stage where I join values into a tuple. It then can be elegantly applied:

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))

Everything great but when given flow is doing a 'filter' operation - this combiner is stuck and stops processing further events.

I guess that is due to 'Zip' behaviour that requires all subflows to do the same - in my case one branch is passing given object directly so another subflow cannot ignore this element with. filter(), and since it does - the flow stops because Zip is waiting for push.

Is there a better way to achieve flow composition? Is there anything I can do in my tupledFlow to get desired behaviour when 'flow' ignores elements with 'filter' ?

like image 514
Tomasz Bartczak Avatar asked Dec 28 '16 16:12

Tomasz Bartczak


People also ask

Can you describe what are 3 main components of Akka streams?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.

What is backpressure in Akka?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

What is Akka Alpakka?

Alpakka is based on Akka Streams and provides first class support for streaming—merging streams, splitting streams, and more. It enables use of reactive programming—providing a rich DSL supporting fully asynchronous and non-blocking processing.


1 Answers

Two possible approaches - with debatable elegance - are:

1) avoid using filtering stages, mutating your filter into a Flow[Int, Option[Int], NotUsed]. This way you can apply your zipping wrapper around your whole graph, as was your original plan. However, the code looks more tainted, and there is added overhead by passing around Nones.

val stringIfEvenOrNone = Flow[Int].map{
  case x if x % 2 == 0 => Some(x.toString)
  case _ => None
}

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{
  case (num, Some(str)) => (num,str)
}

2) separate the filtering and transforming stages, and apply the filtering ones before your zipping wrapper. Probably a more lightweight and better compromise.

val filterEven = Flow[Int].filter(_ % 2 == 0)

val toString = Flow[Int].map(_.toString)

val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString))

EDIT

3) Posting another solution here for clarity, as per the discussions in the comments.

This flow wrapper allows to emit each element from a given flow, paired with the original input element that generated it. It works for any kind of inner flow (emitting 0, 1 or more elements for each input).

  def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] =
    Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out))
like image 159
Stefano Bonetti Avatar answered Oct 03 '22 13:10

Stefano Bonetti