Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Select First Async Result

Is there an async operator to get the value first returned by two asynchronous values (Async<_>)?

For example, given two Async<_> values where one A1 returns after 1 second and A2 returns after 2 seconds, then I want the result from A1.

The reason is I want to implement an interleave function for asynchronous sequences, so that if there are two asynchronous sequences "defined" like this (with space indicating time as with marble diagrams):

S1 = -+-----+------------+----+
S2 = ---+-------+----------+-----+

Then I want to generate a new asynchronous sequence that acts like this:

S3 = -+-+---+---+--------+-+--+--+

Interleave S1 S2 = S3

But two do that, I probably need a kind of async select operator to select select values.

I think this would be like "select" in Go, where you can take the first available value from two channels.

TPL has a function called Task.WhenAny - I probably need something similar here.

like image 810
Bent Rasmussen Avatar asked Jun 29 '26 16:06

Bent Rasmussen


2 Answers

I don't think the operator is available in the F# library. To combine this from existing operations, you could use Async.StartAsTask and then use the existing Task.WhenAny operator. However, I'm not exactly sure how that would behave with respect to cancellation.

The other option is to use the Async.Choose operator implemented on F# Snippets web site. This is not particularly elegant, but it should do the trick! To make the answer stand-alone, the code is attached below.

/// Creates an asynchronous workflow that non-deterministically returns the 
/// result of one of the two specified workflows (the one that completes
/// first). This is similar to Task.WaitAny.
static member Choose(a, b) : Async<'T> = 
    Async.FromContinuations(fun (cont, econt, ccont) ->
      // Results from the two 
      let result1 = ref (Choice1Of3())
      let result2 = ref (Choice1Of3())
      let handled = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f

      // Called when one of the workflows completes
      let complete () = 
        let op =
          synchronized (fun () ->
            // If we already handled result (and called continuation)
            // then ignore. Otherwise, if the computation succeeds, then
            // run the continuation and mark state as handled.
            // Only throw if both workflows failed.
            match !handled, !result1, !result2 with 
            | true, _, _ -> ignore
            | false, (Choice2Of3 value), _ 
            | false, _, (Choice2Of3 value) -> 
                handled := true
                (fun () -> cont value)
            | false, Choice3Of3 e1, Choice3Of3 e2 -> 
                handled := true; 
                (fun () -> 
                    econt (new AggregateException
                                ("Both clauses of a choice failed.", [| e1; e2 |])))
            | false, Choice1Of3 _, Choice3Of3 _ 
            | false, Choice3Of3 _, Choice1Of3 _ 
            | false, Choice1Of3 _, Choice1Of3 _ -> ignore )
        op() 

      // Run a workflow and write result (or exception to a ref cell
      let run resCell workflow = async {
        try
          let! res = workflow
          synchronized (fun () -> resCell := Choice2Of3 res)
        with e ->
          synchronized (fun () -> resCell := Choice3Of3 e)
        complete() }

      // Start both work items in thread pool
      Async.Start(run result1 a)
      Async.Start(run result2 b) )
like image 198
Tomas Petricek Avatar answered Jul 01 '26 10:07

Tomas Petricek


Tomas already answered the precise question. However, you might be interested to know that my Hopac library for F# directly supports Concurrent ML -style first-class, higher-order, selective events, called alternatives, which directly provide a choose -combinator and provide a more expressive concurrency abstraction mechanism than Go's select statement.

Regarding your more specific problem of interleaving two asynchronous sequences, I recently started experimenting with ideas on how Rx-style programming could be done with Hopac. One potential approach I came up with is to define a kind of ephemeral event streams. You can find the experimental code here:

  • Alts.fsi
  • Alts.fs

As you can see, one of the operations defined for event streams is merge. What you are looking for may be slightly different semantically, but would likely be straightforward to implement using Hopac -style alternatives (or Concurrent ML -style events).


Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!