Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I throttle a large array of async workflows passed to Async.Parallel

I have an array holding a large number of small async database queries; for example:

// I actually have a more complex function that
// accepts name/value pairs for query parameters.
let runSql connString sql = async {
    use connection = new SqlConnection(connString)
    use command = new SqlCommand(sql, connection)
    do! connection.OpenAsync() |> Async.AwaitIAsyncResult |> Async.Ignore
    return! command.ExecuteScalarAsync() |> Async.AwaitTask
    }

let getName (id:Guid) = async {
    // I actually use a parameterized query
    let querySql = "SELECT Name FROM Entities WHERE ID = '" + id.ToString() + "'"
    return! runSql connectionString querySql
    }

let ids : Guid array = getSixtyThousandIds()

let asyncWorkflows = ids |> Array.map getName
//...

Now, the problem: The next expression runs all 60K workflows at once, flooding the server. This leads to many of the SqlCommands timing out; it also typically causes out of memory exceptions in the client (which is F# interactive) for reasons I do not understand and (not needing to understand them) have not investigated:

//...
let names =
    asyncWorkflows
    |> Async.Parallel
    |> Async.RunSynchronously

I've written a rough-and-ready function to batch the requests:

let batch batchSize asyncs = async {
    let batches = asyncs
                  |> Seq.mapi (fun i a -> i, a)
                  |> Seq.groupBy (fst >> fun n -> n / batchSize)
                  |> Seq.map (snd >> Seq.map snd)
                  |> Seq.map Async.Parallel
    let results = ref []
    for batch in batches do
        let! result = batch
        results := (result :: !results)
    return (!results |> List.rev |> Seq.collect id |> Array.ofSeq)
}

To use this function, I replace Async.Parallel with batch 20 (or another integer value):

let names =
    asyncWorkflows
    |> batch 20
    |> Async.RunSynchronously

This works reasonably well, but I would prefer to have a system that starts each new async as soon as one completes, so rather than successive batches of size N starting after each previous batch of size N has finished, I am always awaiting N active SqlCommands (until I get to the end, of course).

Questions:

  • Am I reinventing the wheel? In other words, are there library functions that do this already? (Would it be profitable to look into exploiting ParallelEnumerable.WithDegreeOfParallelism somehow?)

  • If not, how should I implement a continuous queue instead of a series of discrete batches?

I am not primarily seeking suggestions to improve the existing code, but such suggestions will nonetheless be received with interest and gratitude.

like image 750
phoog Avatar asked Jun 17 '15 00:06

phoog


1 Answers

FSharpx.Control offers an Async.ParallelWithThrottle function. I'm not sure if it is the best implementation as it uses SemaphoreSlim. But the ease of use is great and since my application doesn't need top performance it works well enough for me. Although since it is a library if someone knows how to make it better it is always a nice thing to make libraries top performers out of the box so the rest of us can just use the code that works and just get our work done!

like image 188
Jon49 Avatar answered Nov 17 '22 19:11

Jon49