I am trying to understand the difference between asyncBoundary
and mapAsync
. From the glance, I guess they should be same. However, when I run the code, it looks like that the performance of asyncBoundary
is quicker than mapAsync
Here is the code
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
The output: async boundary is always finished quicker than mayAsync.
From the document described about asyncBoundary (https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html), I can see it is running on different CPU, but mapAsync is multi-threaded by using Future. Future is also asynchronous.
May I ask more clarification about this two APIs ?
Async
As you correctly point out this forces the insertion of an asynchronous boundary between two stages. In your example
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
this practically means that the + 1
operation and the * 2
operation will be run by separated actors. This enables pipelining, as whilst an element moves on to the * 2
stage, at the same time another element can be brought in for the + 1
stage. If you don't force an async boundary there, the same actor will sequentialise the operations and will perform the operations on one element, before requesting a new one from upstream.
By the way, your example can be rewritten in a shorter format, using the async
combinator:
Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
mapAsync
This is a stage to parallelise execution of asynchronous operations. The parallelism factor allows you to specify the maximum number of parallel actors to spin up to serve incoming elements. The results of the parallel computations are tracked and emitted in order by the mapAsync
stage.
In your example
Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
potentially up to 100 + 1
operations (i.e. all of them) could be run in parallel, and the results collected in order. Subsequently, up to 100 * 2
operations could be run in parallel, and again the results collected in order and emitted downstream.
In your example you are running CPU-bound, quick operations that don't justify using mapAsync
, as most likely the infrastructure needed by this stage is much more expensive than the advantage of running 100 of these operations in parallel. mapAsync
is particularly useful when dealing with IO-bound, slow operations, where parallelisation is quite convenient.
For a comprehensive read on this topic, check out this blogpost.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With