changes.Where(p => Evaluate(p)).Subscribe(p => { // Do something });
But how could you make it so that you get the criterion value and n values after the criterion is fulfilled (and these n values do not have to match the evaluation criterion)?
Evaluate(p)
and one value after that (and then starts to evaluate p
again)Please take a look on SkipWhile and Take extension methods of IEnumerable. You can try following code:
changes.SkipWhile(change => Evaluate(change) == false).Take(n).Subscribe(change => { /* do something */ });
Edit
New code to take all matching items from sequence with n
items' tail (without retrieving items repeatedly)
// Let's assume elements in the sequence are of type Change
int i = 0;
Func<Change, bool> evaluateWithTail = change =>
{
if (i <= 0 || i > n)
{
i = Evaluate(change) ? 1 : 0;
}
else
{
i++;
}
return i > 0;
}
// Please note delegate is specified as parameter directly - without lambda expression
changes.Where(evaluateWithTail).Subscribe(change => { /* do something */ });
Here's another implementation that's a bit shorter:
var filtered = source
.SkipWhile( x => !Criteria(x) )
.Take(3)
.Repeat()
I don't think you can create an Rx operator by combining existing operators because essentially what you want is to use the Where
operator, but after it matches you want to "turn it off" for the next N elements. OK, apparently you can by using the Repeat
operator and that proves how composable Rx is.
Anyway, you can also create a new operator using best practices for creating your own Rx operator:
static class Extensions {
public static IObservable<T> WhereThenTake<T>(
this IObservable<T> source,
Predicate<T> predicate,
Int32 count
) {
if (source == null)
throw new ArgumentNullException("source");
if (predicate == null)
throw new ArgumentNullException("predicate");
if (count < 0)
throw new ArgumentException("count");
return Observable.Create<T>(
observer => {
var finished = false;
var n = 0;
var disposable = source.Subscribe(
x => {
if (!finished) {
if (n > 0) {
observer.OnNext(x);
n -= 1;
}
else if (predicate(x)) {
n = count;
observer.OnNext(x);
}
}
},
ex => { finished = true; observer.OnError(ex); },
() => { finished = true; observer.OnCompleted(); }
);
return disposable;
}
);
}
}
You then use it like this (Evaluate
is your predicate and n
is the number of items to pass through after the predicate matches):
changes.WhereThenTake(Evaluate, n).Subscribe( ... );
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With