It's possible to create sources and sinks from actors using Source.actorPublisher()
and Sink.actorSubscriber()
methods respectively. But is it possible to create a Flow
from actor?
Conceptually there doesn't seem to be a good reason not to, given that it implements both ActorPublisher
and ActorSubscriber
traits, but unfortunately, the Flow
object doesn't have any method for doing this. In this excellent blog post it's done in an earlier version of Akka Streams, so the question is if it's possible also in the latest (2.4.9) version.
Companion object Flow A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
The graph DSL provides GraphDSL. Builder to create the nodes in the graph, and a ~> method is used to connect nodes together, much like the via method. A node in a graph is of type Graph , which could be confusing when referring to a part of the graph, so we'll use the term “node” instead in certain cases.
I'm part of the Akka team and would like to use this question to clarify a few things about the raw Reactive Streams interfaces. I hope you'll find this useful.
Most notably, we'll be posting multiple posts on the Akka team blog about building custom stages, including Flows, soon, so keep an eye on it.
Don't use ActorPublisher / ActorSubscriber
Please don't use ActorPublisher
and ActorSubscriber
. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification. They're a relict of the past and even then were only "power-user mode only". There really is no reason to use those classes nowadays. We never provided a way to build a flow because the complexity is simply explosive if it was exposed as "raw" Actor API for you to implement and get all the rules implemented correctly.
If you really really want to implement raw ReactiveStreams interfaces, then please do use the Specification's TCK to verify your implementation is correct. You will likely be caught off guard by some of the more complex corner cases a Flow
(or in RS terminology a Processor
has to handle).
Most operations are possible to build without going low-level
Many flows you should be able to simply build by building from a Flow[T]
and adding the needed operations onto it, just as an example:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Which is a reusable description of the Flow.
Since you're asking about power user mode, this is the most powerful operator on the DSL itself: statefulFlatMapConcat
. The vast majority of operations operating on plain stream elements is expressable using it: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T]
.
If you need timers you could zip
with a Source.timer
etc.
GraphStage is the simplest and safest API to build custom stages
Instead, building Sources/Flows/Sinks has its own powerful and safe API: the GraphStage
. Please read the documentation about building custom GraphStages (they can be a Sink/Source/Flow or even any arbitrary shape). It handles all of the complex Reactive Streams rules for you, while giving you full freedom and type-safety while implementing your stages (which could be a Flow).
For example, taken from the docs, is an GraphStage implementation of the filter(T => Boolean)
operator:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
It also handles asynchronous channels and is fusable by default.
In addition to the docs, these blog posts explain in detail why this API is the holy grail of building custom stages of any shape:
Konrad's solution demonstrates how to create a custom stage that utilizes Actors, but in most cases I think that is a bit overkill.
Usually you have some Actor that is capable of responding to questions:
val actorRef : ActorRef = ???
type Input = ???
type Output = ???
val queryActor : Input => Future[Output] =
(actorRef ? _) andThen (_.mapTo[Output])
This can be easily utilized with basic Flow
functionality which takes in the maximum number of concurrent requests:
val actorQueryFlow : Int => Flow[Input, Output, _] =
(parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
Now actorQueryFlow
can be integrated into any stream...
Here is a solution build by using a graph stage. The actor has to acknowledge all messages in order to have back-pressure. The actor is notified when the stream fails/completes and the stream fails when the actor terminates. This can be useful if you don't want to use ask, e.g. when not every input message has a corresponding output message.
import akka.actor.{ActorRef, Status, Terminated}
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
object ActorRefBackpressureFlowStage {
case object StreamInit
case object StreamAck
case object StreamCompleted
case class StreamFailed(ex: Throwable)
case class StreamElementIn[A](element: A)
case class StreamElementOut[A](element: A)
}
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `StreamInit`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages.
*
* The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will
* be emitted downstream when there is demand.
*
* If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
* When the stream is completed successfully a `StreamCompleted` message
* will be sent to the destination actor.
* When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor.
*/
class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {
import ActorRefBackpressureFlowStage._
val in: Inlet[In] = Inlet("ActorFlowIn")
val out: Outlet[Out] = Outlet("ActorFlowOut")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private lazy val self = getStageActor {
case (_, StreamAck) =>
if(firstPullReceived) {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
} else {
pullOnFirstPullReceived = true
}
case (_, StreamElementOut(elemOut)) =>
val elem = elemOut.asInstanceOf[Out]
emit(out, elem)
case (_, Terminated(targetRef)) =>
failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))
case (actorRef, unexpected) =>
failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`."))
}
var firstPullReceived: Boolean = false
var pullOnFirstPullReceived: Boolean = false
override def preStart(): Unit = {
//initialize stage actor and watch flow actor.
self.watch(flowActor)
tellFlowActor(StreamInit)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elementIn = grab(in)
tellFlowActor(StreamElementIn(elementIn))
}
override def onUpstreamFailure(ex: Throwable): Unit = {
tellFlowActor(StreamFailed(ex))
super.onUpstreamFailure(ex)
}
override def onUpstreamFinish(): Unit = {
tellFlowActor(StreamCompleted)
super.onUpstreamFinish()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if(!firstPullReceived) {
firstPullReceived = true
if(pullOnFirstPullReceived) {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
}
}
}
override def onDownstreamFinish(): Unit = {
tellFlowActor(StreamCompleted)
super.onDownstreamFinish()
}
})
private def tellFlowActor(message: Any): Unit = {
flowActor.tell(message, self.ref)
}
}
override def shape: FlowShape[In, Out] = FlowShape(in, out)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With