I have an array holding a large number of small async database queries; for example:
// I actually have a more complex function that
// accepts name/value pairs for query parameters.
let runSql connString sql = async {
use connection = new SqlConnection(connString)
use command = new SqlCommand(sql, connection)
do! connection.OpenAsync() |> Async.AwaitIAsyncResult |> Async.Ignore
return! command.ExecuteScalarAsync() |> Async.AwaitTask
}
let getName (id:Guid) = async {
// I actually use a parameterized query
let querySql = "SELECT Name FROM Entities WHERE ID = '" + id.ToString() + "'"
return! runSql connectionString querySql
}
let ids : Guid array = getSixtyThousandIds()
let asyncWorkflows = ids |> Array.map getName
//...
Now, the problem: The next expression runs all 60K workflows at once, flooding the server. This leads to many of the SqlCommand
s timing out; it also typically causes out of memory exceptions in the client (which is F# interactive) for reasons I do not understand and (not needing to understand them) have not investigated:
//...
let names =
asyncWorkflows
|> Async.Parallel
|> Async.RunSynchronously
I've written a rough-and-ready function to batch the requests:
let batch batchSize asyncs = async {
let batches = asyncs
|> Seq.mapi (fun i a -> i, a)
|> Seq.groupBy (fst >> fun n -> n / batchSize)
|> Seq.map (snd >> Seq.map snd)
|> Seq.map Async.Parallel
let results = ref []
for batch in batches do
let! result = batch
results := (result :: !results)
return (!results |> List.rev |> Seq.collect id |> Array.ofSeq)
}
To use this function, I replace Async.Parallel
with batch 20
(or another integer value):
let names =
asyncWorkflows
|> batch 20
|> Async.RunSynchronously
This works reasonably well, but I would prefer to have a system that starts each new async as soon as one completes, so rather than successive batches of size N starting after each previous batch of size N has finished, I am always awaiting N active SqlCommand
s (until I get to the end, of course).
Questions:
Am I reinventing the wheel? In other words, are there library functions that do this already? (Would it be profitable to look into exploiting ParallelEnumerable.WithDegreeOfParallelism somehow?)
If not, how should I implement a continuous queue instead of a series of discrete batches?
I am not primarily seeking suggestions to improve the existing code, but such suggestions will nonetheless be received with interest and gratitude.
FSharpx.Control
offers an Async.ParallelWithThrottle
function. I'm not sure if it is the best implementation as it uses SemaphoreSlim
. But the ease of use is great and since my application doesn't need top performance it works well enough for me. Although since it is a library if someone knows how to make it better it is always a nice thing to make libraries top performers out of the box so the rest of us can just use the code that works and just get our work done!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With