Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

F# how to run several asynchrous tasks and wait for result of first completed?

There are many examples how to execute async tasks in f# like

[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously

But how I can asynchronously wait for only the first result?

For example if I want to run some parallel look up tasks and I want to search deeper when the first successful result is obtained. (Basically the equivalent as Task.WhenAny in the C# world...)

like image 423
Aleksander Kois Avatar asked Dec 11 '13 10:12

Aleksander Kois


4 Answers

I'd use something like:

let any asyncs =
    async {
        let t = 
            asyncs
            |> Seq.map Async.StartAsTask
            |> System.Threading.Tasks.Task.WhenAny
        return t.Result.Result }
like image 176
kvb Avatar answered Nov 20 '22 02:11

kvb


Another implementation based on an event:

let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
    async {
        let e = Event<'T>()
        let cts = new System.Threading.CancellationTokenSource()
        do Async.Start(
            asyncs
            |> Seq.map (fun a -> async { let! x = a in e.Trigger x })
            |> Async.Parallel
            |> Async.Ignore,
            cts.Token)
        let! result = Async.AwaitEvent e.Publish
        cts.Cancel()
        return result
    }
like image 31
Tarmil Avatar answered Oct 19 '22 04:10

Tarmil


A general-purpose solution can be found in the following snippet: http://fssnip.net/dN

Async.Choice can be embedded in any asynchronous workflow, just like Async.Parallel. The optional output type encodes the possibility that a child computation can complete without a satisfactory result.

like image 5
eirik Avatar answered Nov 20 '22 03:11

eirik


The simplest possible implementation I could think of looks something like this:

open FSharp.Control

let getOneOrOther () =
    let queue = BlockingQueueAgent(1)
    let async1 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(1) } |> Async.Start
    let async2 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(2) } |> Async.Start

    queue.Get()

for i in 1..10 do
    printfn "%d" <| getOneOrOther ()

Console.ReadLine () |> ignore

It relies on the blocking queue implementation from the FSharpx project, which you will probably want for other reasons. But if you don't want any dependencies System.Collections.Concurrent also includes a blocking queue, with a slightly less nice interface.

For a more general version with cancellation built in, the version below takes a Seq<unit -> Async<'T>> and returns the first result to come through, cancelling all the others.

open FSharp.Control
open System.Threading

let async1 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 1 }
let async2 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 2 }

let getFirst asyncs =
    let queue = BlockingQueueAgent(1)
    let doWork operation = async { 
            let! result = operation()
            do! queue.AsyncAdd(result) }
    let start work =
        let cts = new CancellationTokenSource()
        Async.Start(work, cts.Token)
        cts
    let cancellationTokens =
        asyncs
        |> Seq.map doWork
        |> Seq.map start
    let result = queue.Get()
    cancellationTokens
    |> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
    result

for i in 1..10 do
    printfn "%A" <| getFirst [async1;async2]

Console.ReadLine () |> ignore
like image 3
mavnn Avatar answered Nov 20 '22 04:11

mavnn