Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Split Akka Stream Source into two

I have an Akka Streams Source which I want to split into two sources according to a predicate.

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

And two methods:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

I tried using Broadcast splitter but it requires Sinks at the end.

like image 822
Tvaroh Avatar asked Jul 18 '16 14:07

Tvaroh


People also ask

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous. Non-Blocking.

What is source in Akka?

The starting point is called Source and can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.Publisher. A flow with an attached input and open output is also a Source. A flow may also be defined without an attached input or output and that is then a Flow.


2 Answers

You can use divertTo to attach alternative Sink to the flow to handle Lefts: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/divertTo.html

source
  .divertTo(handleFailureSink, _.isLeft)
  .map(rightEither => handleSuccess(rightEither.right.get()))
like image 54
amorfis Avatar answered Sep 17 '22 15:09

amorfis



Edit: this other answer with divertTo is a better solution than mine, IMO. I'll leave my answer as-is for posterity.


original answer:

This is implemented in akka-stream-contrib as PartitionWith. Add this dependency to SBT to pull it in to your project:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"```

`PartitionWith` is shaped like a `Broadcast(2)`, but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a `Sink` or `Flow` to each of these outlets independently as appropriate. Building on [cessationoftime's example](https://stackoverflow.com/a/39744355/147806), with the `Broadcast` replaced with a `PartitionWith`:

    val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
    val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
    val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

    val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                      ((_, _, _)) { implicit b => (s, l, r) =>

      import GraphDSL.Implicits._

      val pw = b.add(
        PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
      )

      eitherSource ~> pw.in
      pw.out0 ~> leftSink
      pw.out1 ~> rightSink

      ClosedShape
    })

    val r = flow.run()
    Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
like image 32
László van den Hoek Avatar answered Sep 16 '22 15:09

László van den Hoek