The Intro To RX book describes the return value on OnSubscribe as IDisposible
and notes that subscriptions should be disposed of when OnError
and OnCompleted
are called.
An interesting thing to consider is that when a sequence completes or errors, you should still dispose of your subscription.
From Intro to RX: Lifetime Management, OnError and OnCompleted
Why is this?
For reference, this is the class I'm currently working on. I'm probably going to submit it to code review at some point.
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
private readonly Action onTimeout;
private object signalLock = new object();
private IObserver<Unit> signals;
/// <summary>
/// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
/// </summary>
/// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
/// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
public TrafficTimeout(TimeSpan timeout, Action onTimeout)
{
// Subscribe to a throttled observable to trigger the expirey
var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
IDisposable subscription = null;
subscription = messageQueue.Throttle(timeout).Subscribe(
p =>
{
messageQueue.OnCompleted();
messageQueue.Dispose();
});
this.signals = messageQueue.AsObserver();
this.onTimeout = onTimeout;
}
/// <summary>
/// Signals that traffic has been received.
/// </summary>
public void Signal()
{
lock (this.signalLock)
{
this.signals.OnNext(Unit.Default);
}
}
}
The disposable returned by the Subscribe
extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends.
If the observable completes - with either OnCompleted
or OnError
- then the subscription is already disposed for you.
Try this code:
var xs = Observable.Create<int>(o =>
{
var d = Observable.Return(1).Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
If you run the above you'll see that "Disposed!" is written to the console when the observable completes without you needing call .Dispose()
on the subscription.
One important thing to note: the garbage collector never calls .Dispose()
on observable subscriptions, so you must dispose of your subscriptions if they have not (or may not have) naturally ended before your subscription goes out of scope.
Take this, for example:
var wc = new WebClient();
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h);
var subscription =
ds.Subscribe(d =>
Console.WriteLine(d.EventArgs.Result));
The ds
observable will only attach to the event handler when it has a subscription and will only detach when the observable completes or the subscription is disposed of. Since it is an event handler the observable will never complete because it is waiting for more events, and hence disposing is the only way to detach from the event (for the above example).
When you have a FromEventPattern
observable that you know will only ever return one value then it is wise to add the .Take(1)
extension method before subscribing to allow the event handler to automatically detach and then you don't need to manually dispose of the subscription.
Like so:
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h)
.Take(1);
I hope this helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With