I've got a SourceQueue
. When I offer an element to this I want it to pass through the Stream
and when it reaches the Sink
have the output returned to the code that offered this element (similar as Sink.head
returns an element to the RunnableGraph.run()
call).
How do I achieve this? A simple example of my problem would be:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
Edit: For the example I have given in this question Vladimir Matveev provided the best answer. However, it should be noted that this solution only works if the elements are going into the sink
in the same order they were offered to the source
. If this cannot be guaranteed the order of the elements in the sink
may differ and the outcome might be different from what is expected.
I believe it is simpler to use the already existing primitive for pulling values from a stream, called Sink.queue
. Here is an example:
val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
def getNext: String = Await.result(sinkQueue.pull(), 1.second).get
sourceQueue.offer("foo")
println(getNext)
sourceQueue.offer("bar")
println(getNext)
sourceQueue.offer("baz")
println(getNext)
It does exactly what you want.
Note that setting the inputBuffer
attribute for the queue sink may or may not be important for your use case - if you don't set it, the buffer will be zero-sized and the data won't flow through the stream until you invoke the pull()
method on the sink.
sinkQueue.pull()
yields a Future[Option[T]]
, which will be completed successfully with Some
if the sink receives an element or with a failure if the stream fails. If the stream completes normally, it will be completed with None
. In this particular example I'm ignoring this by using Option.get
but you would probably want to add custom logic to handle this case.
Well, you know what offer()
method returns if you take a look at its definition :) What you can do is to create Source.queue[(Promise[String], String)]
, create helper function that pushes pair to stream via offer
, make sure offer
doesn't fail because queue might be full, then complete promise inside your stream and use future of the promise to catch completion event in external code.
I do that to throttle rate to external API used from multiple places of my project.
Here is how it looked in my project before Typesafe added Hub sources to akka
import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import scala.util.Success
private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
.toMat(Sink.foreach({ case (p, param) =>
p.complete(Success(param.reverse))
}))(Keep.left)
.run
private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
private def sendQueuedRequest(request: String): Future[String] = {
val p = Promise[String]
val offerFuture = queue.offer(p -> request)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete(_ => futureDeque.remove(future))
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
else
sendQueuedRequest(request)
}
}
I realize that blocking synchronized queue may be bottleneck and may grow indefinitely but because API calls in my project are made only from other akka streams which are backpressured I never have more than dozen items in futureDeque
. Your situation may differ.
If you create MergeHub.source[(Promise[String], String)]()
instead you'll get reusable sink. Thus every time you need to process item you'll create complete graph and run it. In that case you won't need hacky java container to queue requests.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With