Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

'WaitFor' an observable

I'm in a situation where I have a list of Tasks that I'm working through (enable drive, change position, wait for stop, disable).

The 'wait for' monitors an IObservable<Status>, which I want to wait on (so I can thread it through ContinueWith and the other tasks).

I started out with the following tasks inside the OnNext handling of the subscriber, but that was just ugly. What I've now come up with is this extension method:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    var tcs = new TaskCompletionSource<T>();
    source
        .Where(pred)
        .DistinctUntilChanged()
        .Take(1)  //OnCompletes the observable, subscription will self-dispose
        .Subscribe(val => tcs.TrySetResult(val),
                    ex => tcs.TrySetException(ex),
                    () => tcs.TrySetCanceled());

    return tcs.Task;
}

(UPDATED with svick's suggestion of handling OnCompleted and OnError)

Questions:

  • Is this good, bad, or ugly?
  • Did I miss an existing extension which could have done this?
  • Are the Where and DistinctUntilChanged in the right order? (I think they are)
like image 638
Benjol Avatar asked Jul 30 '12 07:07

Benjol


2 Answers

At the very least I would change this extension method to be this:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    return
        source
            .Where(pred)
            .DistinctUntilChanged()
            .Take(1)
            .ToTask();
}

Using .ToTask() is much better than to introduce TaskCompletionSource. You require a reference to the System.Reactive.Threading.Tasks namespace to get the .ToTask() extension method.

Also, DistinctUntilChanged is redundant in this code. You only ever get one value so it must be distinct by default.

Now, my next suggestion may be a little controversial. This extension is a bad idea because it hides the true semantics of what is going on.

If I had these two snippits of code:

var t = xs.WaitFor(x => x > 10);

Or:

var t = xs.Where(x => x > 10).Take(1).ToTask();

I would generally prefer the second snippit as it clearly shows me what is going on - I don't need to remember the semantics of WaitFor.

Unless you made the name of WaitFor more descriptive - perhaps TakeOneAsTaskWhere - then you are taking the clarity of using the operators out of the code that uses it and making the code harder to manage.

Doesn't the following make it easier to remember the semantics?

var t = xs.TakeOneAsTaskWhere(x => x > 10);

The bottom-line for me is that Rx operators are meant to be composed, not encapsulated, but if you're going to encapsulate them then their meaning must be clear.

I hope this helps.

like image 142
Enigmativity Avatar answered Oct 14 '22 08:10

Enigmativity


Not 100% sure about this, but from reading the Rx 2.0 beta blog post, I would think that if you can use async/await, you could "return await source.FirstAsync(pred)" or without async, "return source.FirstAsync(pred).ToTask()"

http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx

sshot of linqpad using rx 2.0 and firstasync

like image 23
James Manning Avatar answered Oct 14 '22 08:10

James Manning