Observable.TakeWhile allows you to run a sequence as long as a condition is true (using a delegate so we can perform computations on the actual sequence objects), but it's checking this condition BEFORE each element. How can I perform the same check but AFTER each element?
The following code demonstrates the problem
void RunIt()
{
List<SomeCommand> listOfCommands = new List<SomeCommand>();
listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);
obs.Subscribe(x =>
{
Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
});
}
class SomeCommand
{
public int CurrentIndex;
public int TotalCount;
}
This outputs
1 of 3
2 of 3
I can't get the third element
Looking at this example, you may think all I have to do is change my condition like so -
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);
But then the observable will never complete (because in my real world code, the stream doesn't end after those three commands)
There's no built in operators to do what you're asking, but here's one that uses Publish
to run two queries while only subscribing to the underlying observable once:
// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.Publish(co => co.TakeWhile(predicate)
.Merge(co.SkipWhile(predicate).Take(1)));
}
And then:
var obs = listOfCommands.ToObservable()
.TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Final edit:
I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, Func<TSource, bool> predicate)
{
return Observable
.Create<TSource>(o => source.Subscribe(x =>
{
o.OnNext(x);
if (predicate(x))
o.OnCompleted();
},
o.OnError,
o.OnCompleted
));
}
You can use the TakeUntil
operator to take every item until a secondary source produces a value; in this case we can specify the second stream to be the first value after the predicate passes:
public static IObservable<TSource> TakeWhileInclusive<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With