Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use an Akka Streams SourceQueue with PlayFramework

Tags:

I would like to use a SourceQueue to push elements dynamically into an Akka Stream source. Play controller needs a Source to be able to stream a result using the chuncked method.
As Play uses its own Akka Stream Sink under the hood, I can't materialize the source queue myself using a Sink because the source would be consumed before it's used by the chunked method (except if I use the following hack).

I'm able to make it work if I pre-materialize the source queue using a reactive-streams publisher, but it's a kind of 'dirty hack' :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

Is there a simpler way to use an Akka Streams SourceQueue with PlayFramework?

Thanks

like image 521
Loic Avatar asked Apr 04 '16 08:04

Loic


People also ask

How do Akka streams work?

Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


2 Answers

The solution is to use mapMaterializedValue on the source to get a future of its queue materialization :

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }
like image 171
Loic Avatar answered Jan 02 '23 13:01

Loic


Would like to share an insight I got today, though it may not be appropriate to your case with Play.

Instead of thinking of a Source to trigger, one can often turn the problem upside down and provide a Sink to the function that does the sourcing.

In such a case, the Sink would be the "recipe" (non-materialized) stage and we can now use Source.queue and materialize it right away. Got queue. Got the flow that it runs.

like image 22
akauppi Avatar answered Jan 02 '23 14:01

akauppi