Is there a convenient way to use an async function as the predicate of a Where
operator on an observable?
For example, if I have a nice tidy but possibly long-running function defined like this:
Task<int> Rank(object item);
Is there a trick to passing it to Where
and maintaining the asynchronous execution? As in:
myObservable.Where(async item => (await Rank(item)) > 5)
In the past, when I've needed to do this, I've resorted to using SelectMany
and projecting those results into a new type along with the original value and then doing the filtering based on that.
myObservable.SelectMany(async item => new
{
ShouldInclude = (await Rank(item)) > 5,
Item = item
})
.Where(o => o.ShouldInclude)
.Select(o => o.Item);
I think that's terribly unreadable, though and I feel like there must be a cleaner way.
I think that's terribly unreadable
Yes, but you can fix that by encapsulating it into a helper method. If you call it Where
, you will get exactly the syntax you wanted:
public static IObservable<T> Where<T>(
this IObservable<T> source, Func<T, Task<bool>> predicate)
{
return source.SelectMany(async item => new
{
ShouldInclude = await predicate(item),
Item = item
})
.Where(x => x.ShouldInclude)
.Select(x => x.Item);
}
Alternatively, you could use something like this:
public static IObservable<T> Where<T>(
this IObservable<T> source, Func<T, Task<bool>> predicate)
{
return source.SelectMany(item =>
predicate(item).ToObservable()
.Select(include => include ? Observable.Return(item) : Observable.Empty<T>())
);
}
Which is basically how Where
works, if you definited it in terms of SelectMany`:
public static IObservable<T> Where<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.SelectMany(item => predicate(item) ? Observable.Return(item) : Observable.Empty<T>());
}
Actually though, @svick's answer has less closures. :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With