I'm using Akka actors in Scala to download resources from external service (HTTP get request). Response from external service is JSON and I have to use paging (provider is very slow). I want to download all paged results concurrently in 10 threads. I use an URL such as this to download chunk: http://service.com/itmes?limit=50&offset=1000
I have created following pipeline:
ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator
ScatterActor takes total count of items to download and divides it into chunks. I created 10 LoadChunkActor's to process tasks concurrently.
override def receive: Receive = {
case LoadMessage(limit) =>
val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
offsets.foreach(offset => context.system.actorSelection(pipe) !
LoadMessage(chunkSize, offset))
}
LoadChunkActor uses Spray to send request. Actor looks like this:
val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
case LoadMessage(limit, offset) =>
val uri: String = s"http://service.com/items?limit=50&offset=$offset"
val responseFuture = pipeline {Get(uri)}
responseFuture onComplete {
case Success(items) => aggregator ! Loaded(items)
}
}
As you can see, LoadChunkActor is requesting chunk from external service and adding callback to be run onComplete. Actor is now ready to take another message and he is requesting another chunk. Spray is using nonblocking API to download chunks. In result external service is flooded with my requests and I get timeouts.
How can I schedule list of tasks but I want to process maximum 10 at the same time?
I have created following solution (similar to pulling http://www.michaelpollmeier.com/akka-work-pulling-pattern/:
ScatterActor (10000x messages) =>
ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
^ |
|<--------WorkDoneMessage------------|
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With