Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.Where with async predicate

Is there a convenient way to use an async function as the predicate of a Where operator on an observable?

For example, if I have a nice tidy but possibly long-running function defined like this:

Task<int> Rank(object item);

Is there a trick to passing it to Where and maintaining the asynchronous execution? As in:

myObservable.Where(async item => (await Rank(item)) > 5)

In the past, when I've needed to do this, I've resorted to using SelectMany and projecting those results into a new type along with the original value and then doing the filtering based on that.

myObservable.SelectMany(async item => new 
  {
    ShouldInclude = (await Rank(item)) > 5,
    Item = item
  })
  .Where(o => o.ShouldInclude)
  .Select(o => o.Item);

I think that's terribly unreadable, though and I feel like there must be a cleaner way.

like image 523
MojoFilter Avatar asked Aug 13 '14 21:08

MojoFilter


2 Answers

I think that's terribly unreadable

Yes, but you can fix that by encapsulating it into a helper method. If you call it Where, you will get exactly the syntax you wanted:

public static IObservable<T> Where<T>(
    this IObservable<T> source, Func<T, Task<bool>> predicate)
{
    return source.SelectMany(async item => new 
        {
            ShouldInclude = await predicate(item),
            Item = item
        })
        .Where(x => x.ShouldInclude)
        .Select(x => x.Item);
}
like image 105
svick Avatar answered Nov 06 '22 07:11

svick


Alternatively, you could use something like this:

public static IObservable<T> Where<T>(
    this IObservable<T> source, Func<T, Task<bool>> predicate)
{
    return source.SelectMany(item => 
        predicate(item).ToObservable()
            .Select(include => include ? Observable.Return(item) : Observable.Empty<T>())
        );
}

Which is basically how Where works, if you definited it in terms of SelectMany`:

public static IObservable<T> Where<T>(
    this IObservable<T> source, Func<T, bool> predicate)
{
    return source.SelectMany(item => predicate(item) ? Observable.Return(item) : Observable.Empty<T>());
}

Actually though, @svick's answer has less closures. :)

like image 43
cwharris Avatar answered Nov 06 '22 06:11

cwharris