Akka streams reduces my boilerplate code significantly and contains many useful features. However, I need to be able to limit the speed at which items are processed. The problem is that I am feeding a Hazelcast queue attached to a source links of resources to download over time (from a single online site) but the number of links entering the queue can grow quite large. Ideally, no more than 50-60 requests would run at once. Is there a feature in Akka Streams that would allow me to limit the number of items being processed at once?
A further limitation is the need for complex state management, code processing, and other features in interacting with certain websites. Akka Http is incapable of helping here. My network code is entirely written in Jsoup and Apache Http Components with an occasional call to a JavaFX based server to render script.
My current attempt to control the rate of input using a buffer as described in the docs follows:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
The mechanism you're looking for is mapAsync
(or mapAsyncUnordered
, if ordering does not need to be preserved - like in your example).
These combinators take a parallelism
parameter which has the purpose of limiting the number of parallel task the stage can run.
It should be made part of your DownloadFlow
.
Assuming your DownloadFlow
runs asynchronous code, you could structure it like this:
def download(input: Input): Future[Output] = ???
val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(downloadFlow)(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
As your download flow has a meaningful materialized value, it will probably be slightly more complex, but hopefully you get the idea.
See the docs for more info.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With