Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx filter by criterion and n values after criterion

Filter by criterion only

changes.Where(p => Evaluate(p)).Subscribe(p => { // Do something });

But how could you make it so that you get the criterion value and n values after the criterion is fulfilled (and these n values do not have to match the evaluation criterion)?

  • e.g. i would like to subscribe to a stream that returns on Evaluate(p) and one value after that (and then starts to evaluate p again)
like image 437
Cel Avatar asked Mar 03 '12 10:03

Cel


3 Answers

Please take a look on SkipWhile and Take extension methods of IEnumerable. You can try following code:

changes.SkipWhile(change => Evaluate(change) == false).Take(n).Subscribe(change => { /* do something */ });

Edit

New code to take all matching items from sequence with n items' tail (without retrieving items repeatedly)

// Let's assume elements in the sequence are of type Change
int i = 0;
Func<Change, bool> evaluateWithTail = change =>
{
    if (i <= 0 || i > n)
    {
        i = Evaluate(change) ? 1 : 0;
    }
    else
    {
        i++;
    }

    return i > 0;
}
// Please note delegate is specified as parameter directly - without lambda expression
changes.Where(evaluateWithTail).Subscribe(change => { /* do something */ });
like image 184
Andrii Kalytiiuk Avatar answered Sep 29 '22 18:09

Andrii Kalytiiuk


Here's another implementation that's a bit shorter:

var filtered = source
  .SkipWhile( x => !Criteria(x) )
  .Take(3)
  .Repeat()
like image 33
Matthew Finlay Avatar answered Sep 29 '22 19:09

Matthew Finlay


I don't think you can create an Rx operator by combining existing operators because essentially what you want is to use the Where operator, but after it matches you want to "turn it off" for the next N elements. OK, apparently you can by using the Repeat operator and that proves how composable Rx is.

Anyway, you can also create a new operator using best practices for creating your own Rx operator:

static class Extensions {

  public static IObservable<T> WhereThenTake<T>(
    this IObservable<T> source,
    Predicate<T> predicate,
    Int32 count
  ) {
    if (source == null)
      throw new ArgumentNullException("source");
    if (predicate == null)
      throw new ArgumentNullException("predicate");
    if (count < 0)
      throw new ArgumentException("count");
    return Observable.Create<T>(
      observer => {
        var finished = false;
        var n = 0;
        var disposable = source.Subscribe(
          x => {
            if (!finished) {
              if (n > 0) {
                observer.OnNext(x);
                n -= 1;
              }
              else if (predicate(x)) {
                n = count;
                observer.OnNext(x);
              }
            }
          },
          ex => { finished = true; observer.OnError(ex); },
          () => { finished = true; observer.OnCompleted(); }
        );
        return disposable;
      }
    );
  }

}

You then use it like this (Evaluate is your predicate and n is the number of items to pass through after the predicate matches):

changes.WhereThenTake(Evaluate, n).Subscribe( ... );
like image 30
Martin Liversage Avatar answered Sep 29 '22 18:09

Martin Liversage