Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Throttled" async download in F#

I'm trying to download the 3000+ photos referenced from the xml backup of my blog. The problem I came across is that if just one of those photos is no longer available, the whole async gets blocked because AsyncGetResponse doesn't do timeouts.

ildjarn helped me to put together a version of AsyncGetResponse which does fail on timeout, but using that gives a lot more timeouts - as though requests that are just queued timeout. It seems like all the WebRequests are launched 'immediately', the only way to make it work is to set the timeout to the time required to download all of them combined: which isn't great because it means I have adjust the timeout depending on the number of images.

Have I reached the limits of vanilla async? Should I be looking at reactive extensions instead?

This is a bit embarassing, because I've already asked two questions here on this particular bit of code, and I still haven't got it working the way I want!

like image 332
Benjol Avatar asked Jun 02 '11 20:06

Benjol


1 Answers

I think there must be a better way to find out that a file is not available than using a timeout. I'm not exactly sure, but is there some way to make it throw an exception if a file cannot be found? Then you could just wrap your async code inside try .. with and you should avoid most of the problems.

Anyway, if you want to write your own "concurrency manager" that runs certain number of requests in parallel and queues remaining pending requests, then the easiest option in F# is to use agents (the MailboxProcessor type). The following object encapsulates the behavior:

type ThrottlingAgentMessage = 
  | Completed
  | Work of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
type ThrottlingAgent(limit) = 
  let agent = MailboxProcessor.Start(fun agent -> 
    /// Represents a state when the agent is blocked
    let rec waiting () = 
      // Use 'Scan' to wait for completion of some work
      agent.Scan(function
        | Completed -> Some(working (limit - 1))
        | _ -> None)
    /// Represents a state when the agent is working
    and working count = async { 
      while true do
        // Receive any message 
        let! msg = agent.Receive()
        match msg with 
        | Completed -> 
            // Decrement the counter of work items
            return! working (count - 1)
        | Work work ->
            // Start the work item & continue in blocked/working state
            async { try do! work 
                    finally agent.Post(Completed) }
            |> Async.Start
            if count < limit then return! working (count + 1)
            else return! waiting () }
    working 0)      

  /// Queue the specified asynchronous workflow for processing
  member x.DoWork(work) = agent.Post(Work work)
like image 144
Tomas Petricek Avatar answered Sep 28 '22 06:09

Tomas Petricek