I have example code to generate an unbound source and working with it:
object Main {
def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val source: Source[String] = Source(() => { Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)}) }) source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } }
}
I want to create class which implements:
trait MySources { def addToSource(item: String) def getSource() : Source[String] }
And I need use it with multiple threads, for example:
class MyThread(mySources: MySources) extends Thread { override def run(): Unit = { for(i <- 1 to 1000000) { // here will be infinite loop mySources.addToSource(i.toString) } } }
And expected full code:
object Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val sources = new MySourcesImplementation() for(i <- 1 to 100) { (new MyThread(sources)).start() } val source = sources.getSource() source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } } }
How to implement MySources
?
One way to have a non-finite source is to use a special kind of actor as the source, one that mixes in the ActorPublisher
trait. If you create one of those kinds of actors, and then wrap with a call to ActorPublisher.apply
, you end up with a Reactive Streams Publisher
instance and with that, you can use an apply
from Source
to generate a Source
from it. After that, you just need to make sure your ActorPublisher
class properly handles the Reactive Streams protocol for sending elements downstream and you are good to go. A very trivial example is as follows:
import akka.actor._ import akka.stream.actor._ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl._ object DynamicSourceExample extends App{ implicit val system = ActorSystem("test") implicit val materializer = ActorFlowMaterializer() val actorRef = system.actorOf(Props[ActorBasedSource]) val pub = ActorPublisher[Int](actorRef) Source(pub). map(_ * 2). runWith(Sink.foreach(println)) for(i <- 1 until 20){ actorRef ! i.toString Thread.sleep(1000) } } class ActorBasedSource extends Actor with ActorPublisher[Int]{ import ActorPublisherMessage._ var items:List[Int] = List.empty def receive = { case s:String => if (totalDemand == 0) items = items :+ s.toInt else onNext(s.toInt) case Request(demand) => if (demand > items.size){ items foreach (onNext) items = List.empty } else{ val (send, keep) = items.splitAt(demand.toInt) items = keep send foreach (onNext) } case other => println(s"got other $other") } }
With Akka Streams 2 you can use a sourceQueue : How to create a Source that can receive elements later via a method call?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With