Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Limit concurrent Web Service request (Or some batch approach)

I've a for comprehension that fetches a comma separated Id List from a web service.
Then I use the Id List to make new calls, my problem here is that the Id List can be around 10 000 long and each call is a medium sized XML document.
The Web Service end point, or it could be the Play Framework, does't quite like it when I request all the 10 000 at the same time asynchronously as I only get around 500 correct responses.

Some pseudo code to highlight the intent.

for {
  respA <- WS.url(url1).get
  id <- respA.body.split(",")
  respB <- WS.url(url2 + id).get
} yield ...

How do I get about to limit the concurrent request to something more feasible?

like image 643
Farmor Avatar asked Jul 07 '13 10:07

Farmor


2 Answers

Here is an example app that batches 10,000 requests (via Play's WS library) into groups of 1,000 - all in an async & non-blocking way:

package controllers

import play.api.libs.concurrent.Promise
import scala.concurrent.duration._
import play.api.libs.ws.WS
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.mvc.{Action, Controller}
import play.api.libs.ws.Response
import play.api.Logger

object Application extends Controller {

  var numRequests = 0

  def index = Action {
    Async {
      val batches: Iterator[Seq[WS.WSRequestHolder]] = requests.grouped(1000)

      val allBatchesFutureResponses = batches.foldLeft(Future.successful(Seq.empty[Response])) { (allFutureResponses, batch) =>
        allFutureResponses.flatMap { responses =>
          val batchFutures = Future.sequence(batch.map(_.get))
          batchFutures.map { batchResponses =>
            responses ++ batchResponses
          }
        }
      }

      allBatchesFutureResponses.map { responses =>
        Logger.info(responses.size.toString)
        Ok
      }
    }
  }

  def requests = (1 to 10000).map { i =>
    WS.url("http://localhost:9000/pause")
  }

  def pause = Action {
    Async {
      Logger.info(numRequests.toString)
      numRequests = numRequests + 1
      Promise.timeout(Ok, 1 seconds)
    }
  }

}
like image 187
James Ward Avatar answered Sep 22 '22 15:09

James Ward


You need to do some sort of throttling.

Akka

How about using some Akka Actors to make the requests? Check out these approaches to throttling with akka:

  • Have a number of child Actors equal to the amount of concurrent requests you want to make. Each child actor sends a response to the parent Actor on completion of the HTTP request Future. Each time a child Actor responds, send it the next request to make.
  • Use Akka's TimerBasedThrottler to drip feed messages to child Actors that make the HTTP requests: http://doc.akka.io/docs/akka/2.1.2/contrib/throttle.html
  • https://stackoverflow.com/a/9615080/936869

Just with Futures

If you want to just use Futures and no Akka Actors, you could use a combination of flatMap (to chain up HTTP requests to happen one after another) and Future.sequence to get the level of parallelism you want.

like image 37
theon Avatar answered Sep 23 '22 15:09

theon