Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait for an observable to complete?

In Node.js, you can set up a server and the process won't terminate as long as the server is alive and active in the event loop. I am wondering if it would be possible to do something like that using reactive extensions? I know how to set up a named pipe and make use of it to communicate with the Node server in another process, but I am not sure how to keep the program from terminating via anything other than waiting for a keypress. I'd like to implement that server as a part of a reactive pipeline. Would it be possible to block the main thread until the pipeline completes?

like image 971
Marko Grdinić Avatar asked Oct 16 '25 13:10

Marko Grdinić


2 Answers

open System
open System.Threading
open FSharp.Control.Reactive

module Observable = 
    /// Blocks the thread until the observable completes. 
    let waitUnit on_next (o : IObservable<_>) =
        use t = new ManualResetEventSlim()
        use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
        t.Wait()

Observable.interval (TimeSpan.FromSeconds(1.0))
|> Observable.take 3
|> Observable.waitUnit (printfn "%i...")

printfn "Done."

The above is using the F# Rx bindings, but the C# version would be similar. The library itself has Observable.wait which is of type IObservable<'a> -> 'a, but that one has the disadvantage of throwing an exception on empty sequences and keeping the latest value in memory until the observable is disposed. This one has the right semantics if the final value is not important.

I've looked under the hood and wait uses ManualResetEventSlim to block the thread it is on, so this should be fine.

like image 93
Marko Grdinić Avatar answered Oct 19 '25 03:10

Marko Grdinić


I'm sorry, but I can't give an answer in F#, but in C# this is quite simple.

If I had this example:

Observable
    .Range(0, 10)
    .Subscribe(x => Console.WriteLine(x));

...and I wanted to wait until it completed, I could rewrite it like this:

    Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray()
        .Wait();

...or:

    await Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray();

As Marko pointed out in the comments, this approach creates an array that may be quite large. To combat this you can replace .ToArray() with .LastAsync() (which was named before the general use of Async to indicate a Task - in this case it just returns the last element of the observable as a IObservable<T>).

The .Wait() does throw on an empty observable, but the await version does not.

like image 22
Enigmativity Avatar answered Oct 19 '25 03:10

Enigmativity



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!