Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef

I'm trying to use the Source.actorRef method to create an akka.stream.scaladsl.Source object. Something of the form

import akka.stream.OverflowStrategy.fail import akka.stream.scaladsl.Source  case class Weather(zip : String, temp : Double, raining : Boolean)  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)  val sunnySource = weatherSource.filter(!_.raining) ... 

My question is: how do I send data to my ActorRef based Source object?

I assumed sending messages to the Source was something of the form

//does not compile weatherSource ! Weather("90210", 72.0, false) weatherSource ! Weather("02139", 32.0, true) 

But weatherSource doesn't have a ! operator or tell method.

The documentation isn't too descriptive on how to use Source.actorRef, it just says you can...

Thank you in advance for your review and response.

like image 734
Ramón J Romero y Vigil Avatar asked Jun 11 '15 15:06

Ramón J Romero y Vigil

People also ask

What is Actorref in Akka?

ActorRefFactory, an interface which is implemented by ActorSystem and akka. actor. ActorContext. This means actors can be created top-level in the ActorSystem or as children of an existing actor, but only from within that actor. ActorRefs can be freely shared among actors by message passing.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

What is source in Akka?

The starting point is called Source and can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.Publisher. A flow with an attached input and open output is also a Source. A flow may also be defined without an attached input or output and that is then a Flow.

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.

2 Answers

You need a Flow:

  import akka.stream.OverflowStrategy.fail   import akka.stream.scaladsl.Source   import akka.stream.scaladsl.{Sink, Flow}    case class Weather(zip : String, temp : Double, raining : Boolean)    val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)    val sunnySource = weatherSource.filter(!_.raining)    val ref = Flow[Weather]     .to(Sink.ignore)     .runWith(sunnySource)    ref ! Weather("02139", 32.0, true) 

Remember this is all experimental and may change!

like image 113
Noah Avatar answered Oct 12 '22 13:10


As @Noah points out the experimental nature of akka-streams, his answer might not work with the 1.0 release. I had to follow the example given by this example:

implicit val materializer = ActorMaterializer() val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run() actorRef ! TweetInfo(...) val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher) 
like image 29
Thien Avatar answered Oct 12 '22 14:10
