Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka combining Sinks without access to Flows

I am using an API that accepts a single AKKA Sink and fills it with data:

def fillSink(sink:Sink[String, _])

Is there a way, without delving into the depths of akka, to handle the output with two sinks instead of one?

For example

val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )

If I had access to the Flow used by the fillSink method I could use flow.alsoTo(mySink1).to(mySink2) but the flow is not exposed.

The only workaround at the moment is to pass a single Sink which handles the strings and passes it on to two StringBuilders to replace mySink1/mySink2, but it feels like that defeats the point of AKKA. Without spending a couple days learning AKKA, I can't tell if there is a way to split output from sinks.

Thanks!

like image 402
Jethro Avatar asked Nov 05 '25 10:11

Jethro


1 Answers

The combine Sink operator, which combines two or more Sinks using a provided Int => Graph[UniformFanOutShape[T, U], NotUsed]] function, might be what you're seeking:

def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]

A trivialized example:

val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))

val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))

Source(List(1, 2, 3)).runWith(combinedSink)

// Doubler: 2
//  Triper: 3
// Doubler: 4
//  Triper: 6
// Doubler: 6
//  Triper: 9
like image 66
Leo C Avatar answered Nov 07 '25 10:11

Leo C



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!