Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add elements to Source dynamically?

Tags:

I have example code to generate an unbound source and working with it:

object Main {

 def main(args : Array[String]): Unit = {    implicit val system = ActorSystem("Sys")   import system.dispatcher    implicit val materializer = ActorFlowMaterializer()    val source: Source[String] = Source(() => {      Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})     })    source.runForeach((item:String) => { println(item) })   .onComplete{ _ => system.shutdown() }  } 

}

I want to create class which implements:

trait MySources {     def addToSource(item: String)     def getSource() : Source[String] } 

And I need use it with multiple threads, for example:

class MyThread(mySources: MySources) extends Thread {   override def run(): Unit = {     for(i <- 1 to 1000000) { // here will be infinite loop         mySources.addToSource(i.toString)     }   } }  

And expected full code:

object Main {   def main(args : Array[String]): Unit = {     implicit val system = ActorSystem("Sys")     import system.dispatcher      implicit val materializer = ActorFlowMaterializer()      val sources = new MySourcesImplementation()      for(i <- 1 to 100) {       (new MyThread(sources)).start()     }      val source = sources.getSource()      source.runForeach((item:String) => { println(item) })     .onComplete{ _ => system.shutdown() }   } } 

How to implement MySources?

like image 935
krynio Avatar asked Mar 16 '15 09:03

krynio


2 Answers

One way to have a non-finite source is to use a special kind of actor as the source, one that mixes in the ActorPublisher trait. If you create one of those kinds of actors, and then wrap with a call to ActorPublisher.apply, you end up with a Reactive Streams Publisher instance and with that, you can use an apply from Source to generate a Source from it. After that, you just need to make sure your ActorPublisher class properly handles the Reactive Streams protocol for sending elements downstream and you are good to go. A very trivial example is as follows:

import akka.actor._ import akka.stream.actor._ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl._  object DynamicSourceExample extends App{    implicit val system = ActorSystem("test")   implicit val materializer = ActorFlowMaterializer()    val actorRef = system.actorOf(Props[ActorBasedSource])   val pub = ActorPublisher[Int](actorRef)    Source(pub).     map(_ * 2).     runWith(Sink.foreach(println))    for(i <- 1 until 20){     actorRef ! i.toString     Thread.sleep(1000)   }  }  class ActorBasedSource extends Actor with ActorPublisher[Int]{   import ActorPublisherMessage._   var items:List[Int] = List.empty    def receive = {     case s:String =>       if (totalDemand == 0)          items = items :+ s.toInt       else         onNext(s.toInt)          case Request(demand) =>         if (demand > items.size){         items foreach (onNext)         items = List.empty       }       else{         val (send, keep) = items.splitAt(demand.toInt)         items = keep         send foreach (onNext)       }       case other =>       println(s"got other $other")   }   } 
like image 166
cmbaxter Avatar answered Oct 25 '22 14:10

cmbaxter


With Akka Streams 2 you can use a sourceQueue : How to create a Source that can receive elements later via a method call?

like image 27
Loic Avatar answered Oct 25 '22 16:10

Loic