This is the same as How do I await a response from an RX Subject without introducing a race condition?, but in F#.
The C# solution looks like:
static async void Foo()
{
    var subject = new Subject<int>();
    var firstInt = subject.FirstAsync().PublishLast();
    firstInt.Connect();
    subject.OnNext(42);
    var x = await firstInt;
    Console.WriteLine("Done waiting: " + x);
}
My attempt in F# is this:
let foo () =
    async {
        use subject = new Subject<int>()
        let firstInt = subject.FirstAsync().PublishLast()
        firstInt.Connect() |> ignore
        subject.OnNext(42)
        let! x = firstInt
        printfn "Done waiting: %d" x
        return ()
    }
The let x! = firstInt gives the compile error This expression was expected to have type Async<'a> but here has type IConnectableObservable<int> so apparently C# does something under the hood that F# doesn't.
Is there a C# implicit interface cast at work here, that I need to do explicitly in F#? If so, I can't figure out what it is.
After further digging, it seems that C# calls GetAwaiter() under the hood when you await something. For a Subject or an IObservable, GetAwaiter returns an AsyncSubject, which isn't immediately useful in F#, but the ToTask extension method in System.Reactive.Threading.Tasks makes it useful. Apparently, you can apply ToTask directly to a Subject (or an IObservable) without going by way of GetAwaiter, so my problem is solved by changing the let! x ... statement to:
    let! x = firstInt.ToTask() |> Async.AwaitTask
edit:
Using FSharpx.Async is a much better way of accomplishing the same thing:
open FSharpx.Control.Observable
let foo () =
    async {
        use subject = new Subject<int>()
        subject.OnNext(42)
        let! x = Async.AwaitObservable subject
        printfn "Done waiting: %d" x
        return ()
    }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With