I would like to use a SourceQueue to push elements dynamically into an Akka Stream source.
Play controller needs a Source to be able to stream a result using the chuncked
method.
As Play uses its own Akka Stream Sink under the hood, I can't materialize the source queue myself using a Sink because the source would be consumed before it's used by the chunked
method (except if I use the following hack).
I'm able to make it work if I pre-materialize the source queue using a reactive-streams publisher, but it's a kind of 'dirty hack' :
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Is there a simpler way to use an Akka Streams SourceQueue with PlayFramework?
Thanks
Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
The solution is to use mapMaterializedValue
on the source to get a future of its queue materialization :
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
Would like to share an insight I got today, though it may not be appropriate to your case with Play.
Instead of thinking of a Source
to trigger, one can often turn the problem upside down and provide a Sink
to the function that does the sourcing.
In such a case, the Sink
would be the "recipe" (non-materialized) stage and we can now use Source.queue
and materialize it right away. Got queue. Got the flow that it runs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With