Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that
Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered
Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?
Signature
The difference is best highlighted in the signatures: Flow.map
takes in a function that returns a type T
while Flow.mapAsync
takes in a function that returns a type Future[T]
.
Practical Example
As an example, suppose that we have a function which queries a database for a user's full name based on a user id:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Given an akka stream Source
of UserID
values we could use Flow.map
within a stream to query the database and print the full names to the console:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.
We could try to improve performance through concurrent queries using a Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
The problem with this simplistic addendum is that we have effectively eliminated backpressure.
The Sink is just pulling in the Future and adding a foreach println
, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map
. Therefore, there is no limit to the number of databaseLookup
running concurrently. Unfettered parallel querying could eventually overload the database.
Flow.mapAsync
to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
Also notice that the Sink.foreach
got simpler, it no longer takes in a Future[FullName]
but just a FullName
instead.
Unordered Async Map
If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered
. For example: you just need to print all of the names to the console but didn't care about order they were printed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With