Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Stream return object from Sink

I've got a SourceQueue. When I offer an element to this I want it to pass through the Stream and when it reaches the Sink have the output returned to the code that offered this element (similar as Sink.head returns an element to the RunnableGraph.run() call).

How do I achieve this? A simple example of my problem would be:

val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()

val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"

Edit: For the example I have given in this question Vladimir Matveev provided the best answer. However, it should be noted that this solution only works if the elements are going into the sink in the same order they were offered to the source. If this cannot be guaranteed the order of the elements in the sink may differ and the outcome might be different from what is expected.

like image 709
RemcoW Avatar asked Mar 11 '23 11:03

RemcoW


2 Answers

I believe it is simpler to use the already existing primitive for pulling values from a stream, called Sink.queue. Here is an example:

val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get

sourceQueue.offer("foo")
println(getNext)

sourceQueue.offer("bar")
println(getNext)

sourceQueue.offer("baz")
println(getNext)

It does exactly what you want.

Note that setting the inputBuffer attribute for the queue sink may or may not be important for your use case - if you don't set it, the buffer will be zero-sized and the data won't flow through the stream until you invoke the pull() method on the sink.

sinkQueue.pull() yields a Future[Option[T]], which will be completed successfully with Some if the sink receives an element or with a failure if the stream fails. If the stream completes normally, it will be completed with None. In this particular example I'm ignoring this by using Option.get but you would probably want to add custom logic to handle this case.

like image 192
Vladimir Matveev Avatar answered Mar 13 '23 02:03

Vladimir Matveev


Well, you know what offer() method returns if you take a look at its definition :) What you can do is to create Source.queue[(Promise[String], String)], create helper function that pushes pair to stream via offer, make sure offer doesn't fail because queue might be full, then complete promise inside your stream and use future of the promise to catch completion event in external code.

I do that to throttle rate to external API used from multiple places of my project.

Here is how it looked in my project before Typesafe added Hub sources to akka

import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque

import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}

import scala.util.Success

private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
  .toMat(Sink.foreach({ case (p, param) =>
      p.complete(Success(param.reverse))
  }))(Keep.left)
  .run

private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()

private def sendQueuedRequest(request: String): Future[String] = {

  val p = Promise[String]

  val offerFuture = queue.offer(p -> request)

  def addToQueue(future: Future[String]): Future[String] = {
    futureDeque.addLast(future)
    future.onComplete(_ => futureDeque.remove(future))
    future
  }

  offerFuture.flatMap {
    case QueueOfferResult.Enqueued =>
      addToQueue(p.future)
  }.recoverWith {
    case ex =>
      val first = futureDeque.pollFirst()
      if (first != null)
        addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
      else
        sendQueuedRequest(request)
  }
}

I realize that blocking synchronized queue may be bottleneck and may grow indefinitely but because API calls in my project are made only from other akka streams which are backpressured I never have more than dozen items in futureDeque. Your situation may differ.

If you create MergeHub.source[(Promise[String], String)]() instead you'll get reusable sink. Thus every time you need to process item you'll create complete graph and run it. In that case you won't need hacky java container to queue requests.

like image 36
expert Avatar answered Mar 13 '23 01:03

expert