I have a very simple IObservable<int>
that acts as a pulse generator every 500ms:
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))
And I have a CancellationTokenSource
(that is used to cancel other work that is going on simultaneously).
How can I use the cancellation token source to cancel my observable sequence?
Here are two handy operators for canceling observable sequences. The difference between them is on what happens in case of cancellation. The TakeUntil
causes a normal completion of the sequence (OnCompleted
), while the WithCancellation
causes an exceptional termination (OnError
).
/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(observer =>
cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
Usage example:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
.Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
.WithCancellation(cts.Token);
Note: In case of cancellation, the custom operators presented above are unsubscribing instantly from the underlying observable. This is something to consider in case the observable includes side-effects. Putting the TakeUntil(cts.Token)
before the operator that performs the side-effects will postpone the completion of the whole observable, until the side-effects are completed (graceful termination). Putting it after the side-effects will make the cancellation instantaneous, resulting potentially to any running code to continue running unobserved, in a fire-and-forget fashion.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With