I'm in a situation where I have a list of Tasks that I'm working through (enable drive, change position, wait for stop, disable).
The 'wait for' monitors an IObservable<Status>
, which I want to wait on (so I can thread it through ContinueWith
and the other tasks).
I started out with the following tasks inside the OnNext handling of the subscriber, but that was just ugly. What I've now come up with is this extension method:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
var tcs = new TaskCompletionSource<T>();
source
.Where(pred)
.DistinctUntilChanged()
.Take(1) //OnCompletes the observable, subscription will self-dispose
.Subscribe(val => tcs.TrySetResult(val),
ex => tcs.TrySetException(ex),
() => tcs.TrySetCanceled());
return tcs.Task;
}
(UPDATED with svick's suggestion of handling OnCompleted
and OnError
)
Where
and DistinctUntilChanged
in the right order? (I think they are)At the very least I would change this extension method to be this:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
return
source
.Where(pred)
.DistinctUntilChanged()
.Take(1)
.ToTask();
}
Using .ToTask()
is much better than to introduce TaskCompletionSource
. You require a reference to the System.Reactive.Threading.Tasks
namespace to get the .ToTask()
extension method.
Also, DistinctUntilChanged
is redundant in this code. You only ever get one value so it must be distinct by default.
Now, my next suggestion may be a little controversial. This extension is a bad idea because it hides the true semantics of what is going on.
If I had these two snippits of code:
var t = xs.WaitFor(x => x > 10);
Or:
var t = xs.Where(x => x > 10).Take(1).ToTask();
I would generally prefer the second snippit as it clearly shows me what is going on - I don't need to remember the semantics of WaitFor
.
Unless you made the name of WaitFor
more descriptive - perhaps TakeOneAsTaskWhere
- then you are taking the clarity of using the operators out of the code that uses it and making the code harder to manage.
Doesn't the following make it easier to remember the semantics?
var t = xs.TakeOneAsTaskWhere(x => x > 10);
The bottom-line for me is that Rx operators are meant to be composed, not encapsulated, but if you're going to encapsulate them then their meaning must be clear.
I hope this helps.
Not 100% sure about this, but from reading the Rx 2.0 beta blog post, I would think that if you can use async/await, you could "return await source.FirstAsync(pred)" or without async, "return source.FirstAsync(pred).ToTask()"
http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With