Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a Source that can receive elements later via a method call?

I would like to create a Source and later push elements on it, like in:

val src = ... // create the Source here // and then, do something like this pushElement(x1, src) pushElement(x2, src) 

What is the recommended way to do this?

Thanks!

like image 559
ale64bit Avatar asked Jun 21 '15 13:06

ale64bit


Video Answer


2 Answers

There are three ways this can be achieved:

1. Post Materialization with SourceQueue

You can use Source.queue that materializes the Flow into a SourceQueue:

case class Weather(zipCode : String, temperature : Double, raining : Boolean)  val bufferSize = 100  //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of a new element. val overflowStrategy = akka.stream.OverflowStrategy.dropHead  val queue = Source.queue(bufferSize, overflowStrategy)                   .filter(!_.raining)                   .to(Sink foreach println)                   .run() // in order to "keep" the queue Materialized value instead of the Sink's  queue offer Weather("02139", 32.0, true) 

2. Post Materialization with Actor

There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)                 .filter(!_.raining)                 .to(Sink foreach println )                 .run() // in order to "keep" the ref Materialized value instead of the Sink's  ref ! Weather("02139", 32.0, true) 

3. Pre Materialization with Actor

Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here:

object WeatherForwarder {   def props : Props = Props[WeatherForwarder] }  //see provided link for example definition class WeatherForwarder extends Actor {...}  val actorRef = actorSystem actorOf WeatherForwarder.props   //note the stream has not been instatiated yet actorRef ! Weather("02139", 32.0, true)   //stream already has 1 Weather value to process which is sitting in the  //ActorRef's internal buffer val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...} 
like image 142
Ramón J Romero y Vigil Avatar answered Sep 30 '22 10:09

Ramón J Romero y Vigil


Since Akka 2.5 Source has a preMaterialize method.

According to the documentation, this looks like the indicated way to do what you ask:

There are situations in which you require a Source materialized value before the Source gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, like Source.queue, Source.actorRef or Source.maybe.

Below an example on how this would be with a SourceQueue. Elements are pushed to the queue before and after materialization, as well as from within the Flow:

import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream.{ActorMaterializer, OverflowStrategy}  implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer()   val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure) val (sourceMat, source) = sourceDecl.preMaterialize()  // Adding element before actual materialization sourceMat.offer("pre materialization element")  val flow = Flow[String].map { e =>   if(!e.contains("new")) {     // Adding elements from within the flow     sourceMat.offer("new element generated inside the flow")   }   s"Processing $e" }  // Actually materializing with `run` source.via(flow).to(Sink.foreach(println)).run()  // Adding element after materialization sourceMat.offer("post materialization element") 

Output:

Processing pre materialization element Processing post materialization element Processing new element generated inside the flow Processing new element generated inside the flow 
like image 32
PetrosP Avatar answered Sep 30 '22 10:09

PetrosP