I'm trying to use the Source.actorRef method to create an akka.stream.scaladsl.Source object. Something of the form
import akka.stream.OverflowStrategy.fail import akka.stream.scaladsl.Source case class Weather(zip : String, temp : Double, raining : Boolean) val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) val sunnySource = weatherSource.filter(!_.raining) ...
My question is: how do I send data to my ActorRef based Source object?
I assumed sending messages to the Source was something of the form
//does not compile weatherSource ! Weather("90210", 72.0, false) weatherSource ! Weather("02139", 32.0, true)
But weatherSource
doesn't have a !
operator or tell
method.
The documentation isn't too descriptive on how to use Source.actorRef, it just says you can...
Thank you in advance for your review and response.
ActorRefFactory, an interface which is implemented by ActorSystem and akka. actor. ActorContext. This means actors can be created top-level in the ActorSystem or as children of an existing actor, but only from within that actor. ActorRefs can be freely shared among actors by message passing.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
The starting point is called Source and can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.Publisher. A flow with an attached input and open output is also a Source. A flow may also be defined without an attached input or output and that is then a Flow.
Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.
You need a Flow
:
import akka.stream.OverflowStrategy.fail import akka.stream.scaladsl.Source import akka.stream.scaladsl.{Sink, Flow} case class Weather(zip : String, temp : Double, raining : Boolean) val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) val sunnySource = weatherSource.filter(!_.raining) val ref = Flow[Weather] .to(Sink.ignore) .runWith(sunnySource) ref ! Weather("02139", 32.0, true)
Remember this is all experimental and may change!
As @Noah points out the experimental nature of akka-streams, his answer might not work with the 1.0 release. I had to follow the example given by this example:
implicit val materializer = ActorMaterializer() val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run() actorRef ! TweetInfo(...) val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With