Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do I need to dispose of subscriptions after completion?

The Intro To RX book describes the return value on OnSubscribe as IDisposible and notes that subscriptions should be disposed of when OnError and OnCompleted are called.

An interesting thing to consider is that when a sequence completes or errors, you should still dispose of your subscription.

From Intro to RX: Lifetime Management, OnError and OnCompleted

Why is this?


For reference, this is the class I'm currently working on. I'm probably going to submit it to code review at some point.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
    private readonly Action onTimeout;
    private object signalLock = new object();
    private IObserver<Unit> signals;

    /// <summary>
    /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
    /// </summary>
    /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
    /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
    public TrafficTimeout(TimeSpan timeout, Action onTimeout)
    {
        // Subscribe to a throttled observable to trigger the expirey
        var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
        IDisposable subscription = null;
        subscription = messageQueue.Throttle(timeout).Subscribe(
        p =>
        {
            messageQueue.OnCompleted();
            messageQueue.Dispose();
        });

        this.signals = messageQueue.AsObserver();
        this.onTimeout = onTimeout;
    }

    /// <summary>
    /// Signals that traffic has been received.
    /// </summary>
    public void Signal()
    {
        lock (this.signalLock)
        {
            this.signals.OnNext(Unit.Default);
        }
    }
}
like image 324
Gusdor Avatar asked Dec 18 '22 02:12

Gusdor


1 Answers

The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends.

If the observable completes - with either OnCompleted or OnError - then the subscription is already disposed for you.

Try this code:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

If you run the above you'll see that "Disposed!" is written to the console when the observable completes without you needing call .Dispose() on the subscription.

One important thing to note: the garbage collector never calls .Dispose() on observable subscriptions, so you must dispose of your subscriptions if they have not (or may not have) naturally ended before your subscription goes out of scope.

Take this, for example:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

The ds observable will only attach to the event handler when it has a subscription and will only detach when the observable completes or the subscription is disposed of. Since it is an event handler the observable will never complete because it is waiting for more events, and hence disposing is the only way to detach from the event (for the above example).

When you have a FromEventPattern observable that you know will only ever return one value then it is wise to add the .Take(1) extension method before subscribing to allow the event handler to automatically detach and then you don't need to manually dispose of the subscription.

Like so:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

I hope this helps.

like image 185
Enigmativity Avatar answered Feb 15 '23 22:02

Enigmativity