Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel an observable sequence

I have a very simple IObservable<int> that acts as a pulse generator every 500ms:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

And I have a CancellationTokenSource (that is used to cancel other work that is going on simultaneously).

How can I use the cancellation token source to cancel my observable sequence?

like image 751
Ronald Wildenberg Avatar asked Jul 20 '11 09:07

Ronald Wildenberg


1 Answers

Here are two handy operators for canceling observable sequences. The difference between them is on what happens in case of cancellation. The TakeUntil causes a normal completion of the sequence (OnCompleted), while the WithCancellation causes an exceptional termination (OnError).

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

Usage example:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

Note: In case of cancellation, the custom operators presented above are unsubscribing instantly from the underlying observable. This is something to consider in case the observable includes side-effects. Putting the TakeUntil(cts.Token) before the operator that performs the side-effects will postpone the completion of the whole observable, until the side-effects are completed (graceful termination). Putting it after the side-effects will make the cancellation instantaneous, resulting potentially to any running code to continue running unobserved, in a fire-and-forget fashion.

like image 83
Theodor Zoulias Avatar answered Oct 07 '22 00:10

Theodor Zoulias