Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancellation token for observable

How can I cancel the following type of Rx Observable, if the following observable is being created on a StartButton click, i.e from a stop button.

var instance = ThreadPoolScheduler.Instance; 

Observable.Interval(TimeSpan.FromSeconds(2), instance)
                     .Subscribe(_ =>
                     {
                     Console.WriteLine(DateTime.Now); // dummy event
                     }
                     );         
like image 407
Mdev Avatar asked Feb 12 '16 11:02

Mdev


2 Answers

Just use one of the overloads of Subscribe that takes a CancellationToken:

observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken);

This simplifies Jon Skeet's example:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);
        Thread.Sleep(10000);
    }
}
like image 72
Timothy Shields Avatar answered Sep 28 '22 03:09

Timothy Shields


You retain the IDisposable that was returned by Subscribe, and call Dispose on it.

There may well be a way of integrating the Rx IDisposable-based unsubscription with CancellationToken out of the box, but just calling Dispose would be a start. (You could always just register a continuation with the cancellation token to call dispose...)

Here's a short but complete example to demonstrate:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        var disposable = Observable
            .Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow));
        cts.Token.Register(() => disposable.Dispose());
        Thread.Sleep(10000);
    }
}
like image 38
Jon Skeet Avatar answered Sep 28 '22 03:09

Jon Skeet