Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttle HTTP request on Akka/Spray

I'm using Akka actors in Scala to download resources from external service (HTTP get request). Response from external service is JSON and I have to use paging (provider is very slow). I want to download all paged results concurrently in 10 threads. I use an URL such as this to download chunk: http://service.com/itmes?limit=50&offset=1000

I have created following pipeline:

ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator

ScatterActor takes total count of items to download and divides it into chunks. I created 10 LoadChunkActor's to process tasks concurrently.

  override def receive: Receive = {
    case LoadMessage(limit) =>
    val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
    offsets.foreach(offset => context.system.actorSelection(pipe) !
    LoadMessage(chunkSize, offset))
 }

LoadChunkActor uses Spray to send request. Actor looks like this:

val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
  case LoadMessage(limit, offset) =>
    val uri: String = s"http://service.com/items?limit=50&offset=$offset"
    val responseFuture = pipeline {Get(uri)}
    responseFuture onComplete {
      case Success(items) => aggregator ! Loaded(items)
    }
 }

As you can see, LoadChunkActor is requesting chunk from external service and adding callback to be run onComplete. Actor is now ready to take another message and he is requesting another chunk. Spray is using nonblocking API to download chunks. In result external service is flooded with my requests and I get timeouts.

How can I schedule list of tasks but I want to process maximum 10 at the same time?

like image 464
KrzyH Avatar asked Aug 07 '14 06:08

KrzyH


1 Answers

I have created following solution (similar to pulling http://www.michaelpollmeier.com/akka-work-pulling-pattern/:

ScatterActor (10000x messages) => 
  ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
         ^                                    |
         |<--------WorkDoneMessage------------|
  1. ThrottleActor pub messages into ListBuffer and sends to LoadChunkActor maximum N count of messages.
  2. When LoadChunkActor sends message to Aggregator through ThrottleMonitorActor.
  3. ThrottleMonitorActor sends confirmation to ThrottleActor.
  4. ThrottleActor sends next message to LoadChunkActor.
like image 190
KrzyH Avatar answered Oct 01 '22 22:10

KrzyH