Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka stream asyncBoundary vs mapAsync

I am trying to understand the difference between asyncBoundary and mapAsync. From the glance, I guess they should be same. However, when I run the code, it looks like that the performance of asyncBoundary is quicker than mapAsync

Here is the code

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

The output: async boundary is always finished quicker than mayAsync.

From the document described about asyncBoundary (https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html), I can see it is running on different CPU, but mapAsync is multi-threaded by using Future. Future is also asynchronous.

May I ask more clarification about this two APIs ?

like image 971
Xiaohe Dong Avatar asked Nov 15 '17 07:11

Xiaohe Dong


1 Answers

Async

As you correctly point out this forces the insertion of an asynchronous boundary between two stages. In your example

Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

this practically means that the + 1 operation and the * 2 operation will be run by separated actors. This enables pipelining, as whilst an element moves on to the * 2 stage, at the same time another element can be brought in for the + 1 stage. If you don't force an async boundary there, the same actor will sequentialise the operations and will perform the operations on one element, before requesting a new one from upstream.

By the way, your example can be rewritten in a shorter format, using the async combinator:

Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

mapAsync

This is a stage to parallelise execution of asynchronous operations. The parallelism factor allows you to specify the maximum number of parallel actors to spin up to serve incoming elements. The results of the parallel computations are tracked and emitted in order by the mapAsync stage.

In your example

Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()

potentially up to 100 + 1 operations (i.e. all of them) could be run in parallel, and the results collected in order. Subsequently, up to 100 * 2 operations could be run in parallel, and again the results collected in order and emitted downstream.

In your example you are running CPU-bound, quick operations that don't justify using mapAsync, as most likely the infrastructure needed by this stage is much more expensive than the advantage of running 100 of these operations in parallel. mapAsync is particularly useful when dealing with IO-bound, slow operations, where parallelisation is quite convenient.

For a comprehensive read on this topic, check out this blogpost.

like image 107
Stefano Bonetti Avatar answered Oct 19 '22 22:10

Stefano Bonetti