Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka stream - List to mapAsync of individual elements

My stream has a Flow whose outputs are List[Any] objects. I want to have a mapAsync followed by some other stages each of which processed an individual element instead of the list. How can I do that?

Effectively I want to connect the output of

Flow[Any].map { msg =>
  someListDerivedFrom(msg)
}

to be consumed by -

Flow[Any].mapAsyncUnordered(4) { listElement =>
  actorRef ? listElement
}.someOtherStuff

How do I do this?

like image 744
anindyaju99 Avatar asked Jul 18 '16 17:07

anindyaju99


People also ask

Which are the 3 main components in a Akka stream?

Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow. Source – this is the Source of data. It has exactly one output.

What is sink in Akka?

Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .

What is Akka Materializer?

Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


1 Answers

I think the combinator you are looking for is mapConcat. This combinator will take an input argument and return something that is an Iterable. A simple example would be as follows:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()

val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)

val graph =
  source.
    mapConcat(identity).
    to(sink)
graph.run

Here, my Source is spitting out List elements, and my Sink accepts the underlying type of what's in those Lists. I can't connect them directly together as the types are different. But if I apply mapConcat between them, they can be connected as that combinator will flatten those List elements out, sending their individual elements (Int) downstream. Because the input element to mapConcat is already an Iterable, then you only need to use the identify function in the body of mapConcat to make things work.

like image 76
cmbaxter Avatar answered Oct 04 '22 04:10

cmbaxter