My stream has a Flow whose outputs are List[Any] objects. I want to have a mapAsync followed by some other stages each of which processed an individual element instead of the list. How can I do that?
Effectively I want to connect the output of
Flow[Any].map { msg =>
someListDerivedFrom(msg)
}
to be consumed by -
Flow[Any].mapAsyncUnordered(4) { listElement =>
actorRef ? listElement
}.someOtherStuff
How do I do this?
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow. Source – this is the Source of data. It has exactly one output.
Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .
Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
I think the combinator you are looking for is mapConcat
. This combinator will take an input argument and return something that is an Iterable
. A simple example would be as follows:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)
val graph =
source.
mapConcat(identity).
to(sink)
graph.run
Here, my Source
is spitting out List
elements, and my Sink
accepts the underlying type of what's in those List
s. I can't connect them directly together as the types are different. But if I apply mapConcat
between them, they can be connected as that combinator will flatten those List
elements out, sending their individual elements (Int
) downstream. Because the input element to mapConcat
is already an Iterable
, then you only need to use the identify
function in the body of mapConcat
to make things work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With