Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

Observable.TakeWhile allows you to run a sequence as long as a condition is true (using a delegate so we can perform computations on the actual sequence objects), but it's checking this condition BEFORE each element. How can I perform the same check but AFTER each element?

The following code demonstrates the problem

    void RunIt()
    {
        List<SomeCommand> listOfCommands = new List<SomeCommand>();
        listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });

        var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);

        obs.Subscribe(x =>
        {
            Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
        });
    }

    class SomeCommand
    {
        public int CurrentIndex;
        public int TotalCount;
    }

This outputs

1 of 3
2 of 3

I can't get the third element

Looking at this example, you may think all I have to do is change my condition like so -

var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);

But then the observable will never complete (because in my real world code, the stream doesn't end after those three commands)

like image 732
NoPyGod Avatar asked Feb 04 '13 23:02

NoPyGod


3 Answers

There's no built in operators to do what you're asking, but here's one that uses Publish to run two queries while only subscribing to the underlying observable once:

// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
    this IObservable<T> source, Func<T, bool> predicate)
{
    return source.Publish(co => co.TakeWhile(predicate)
        .Merge(co.SkipWhile(predicate).Take(1)));
}

And then:

var obs = listOfCommands.ToObservable()
    .TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
like image 169
Richard Szalay Avatar answered Nov 13 '22 11:11

Richard Szalay


Final edit:

I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event

public static IObservable<TSource> TakeUntil<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
        {
            o.OnNext(x);
            if (predicate(x))
                o.OnCompleted();
        },
        o.OnError,
        o.OnCompleted
    ));
}
like image 6
NoPyGod Avatar answered Nov 13 '22 09:11

NoPyGod


You can use the TakeUntil operator to take every item until a secondary source produces a value; in this case we can specify the second stream to be the first value after the predicate passes:

public static IObservable<TSource> TakeWhileInclusive<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}
like image 2
Alex Avatar answered Nov 13 '22 09:11

Alex