Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Streams. Control The Number of Items Being Processed in Akka Streams At One Time

Akka streams reduces my boilerplate code significantly and contains many useful features. However, I need to be able to limit the speed at which items are processed. The problem is that I am feeding a Hazelcast queue attached to a source links of resources to download over time (from a single online site) but the number of links entering the queue can grow quite large. Ideally, no more than 50-60 requests would run at once. Is there a feature in Akka Streams that would allow me to limit the number of items being processed at once?

A further limitation is the need for complex state management, code processing, and other features in interacting with certain websites. Akka Http is incapable of helping here. My network code is entirely written in Jsoup and Apache Http Components with an occasional call to a JavaFX based server to render script.

My current attempt to control the rate of input using a buffer as described in the docs follows:

val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)     
val (killSwitch, last) = source
                .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
                .viaMat(new DownloadFlow())(Keep.both)
                .map(x => println(x))
                .to(Sink.ignore).run()
like image 917
Andrew Scott Evans Avatar asked Jun 09 '17 00:06

Andrew Scott Evans


1 Answers

The mechanism you're looking for is mapAsync (or mapAsyncUnordered, if ordering does not need to be preserved - like in your example). These combinators take a parallelism parameter which has the purpose of limiting the number of parallel task the stage can run.

It should be made part of your DownloadFlow. Assuming your DownloadFlow runs asynchronous code, you could structure it like this:

def download(input: Input): Future[Output] = ???

val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download)

val (killSwitch, last) = source
                .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
                .viaMat(downloadFlow)(Keep.both)
                .map(x => println(x))
                .to(Sink.ignore).run()

As your download flow has a meaningful materialized value, it will probably be slightly more complex, but hopefully you get the idea.

See the docs for more info.

like image 148
Stefano Bonetti Avatar answered Oct 17 '22 02:10

Stefano Bonetti