There are many examples how to execute async tasks in f# like
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
But how I can asynchronously wait for only the first result?
For example if I want to run some parallel look up tasks and I want to search deeper when the first successful result is obtained. (Basically the equivalent as Task.WhenAny
in the C# world...)
I'd use something like:
let any asyncs =
async {
let t =
asyncs
|> Seq.map Async.StartAsTask
|> System.Threading.Tasks.Task.WhenAny
return t.Result.Result }
Another implementation based on an event:
let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
async {
let e = Event<'T>()
let cts = new System.Threading.CancellationTokenSource()
do Async.Start(
asyncs
|> Seq.map (fun a -> async { let! x = a in e.Trigger x })
|> Async.Parallel
|> Async.Ignore,
cts.Token)
let! result = Async.AwaitEvent e.Publish
cts.Cancel()
return result
}
A general-purpose solution can be found in the following snippet: http://fssnip.net/dN
Async.Choice
can be embedded in any asynchronous workflow, just like Async.Parallel
. The optional output type encodes the possibility that a child computation can complete without a satisfactory result.
The simplest possible implementation I could think of looks something like this:
open FSharp.Control
let getOneOrOther () =
let queue = BlockingQueueAgent(1)
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(1) } |> Async.Start
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(2) } |> Async.Start
queue.Get()
for i in 1..10 do
printfn "%d" <| getOneOrOther ()
Console.ReadLine () |> ignore
It relies on the blocking queue implementation from the FSharpx project, which you will probably want for other reasons. But if you don't want any dependencies System.Collections.Concurrent
also includes a blocking queue, with a slightly less nice interface.
For a more general version with cancellation built in, the version below takes a Seq<unit -> Async<'T>>
and returns the first result to come through, cancelling all the others.
open FSharp.Control
open System.Threading
let async1 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
let getFirst asyncs =
let queue = BlockingQueueAgent(1)
let doWork operation = async {
let! result = operation()
do! queue.AsyncAdd(result) }
let start work =
let cts = new CancellationTokenSource()
Async.Start(work, cts.Token)
cts
let cancellationTokens =
asyncs
|> Seq.map doWork
|> Seq.map start
let result = queue.Get()
cancellationTokens
|> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
result
for i in 1..10 do
printfn "%A" <| getFirst [async1;async2]
Console.ReadLine () |> ignore
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With