Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX: Execute an action when a subscription is started or disposed?

I would like to keep track of the number of active subscriptions to my IObservable ? I thought I could implement this by incrementing / decrementing a counter whenever "someone" calls Subscribe / Dispose.

How can I do that ? Or is there a better way of doing it ?

It seems to be done by RefCount internally but the subscriptions counter is not exposed.

Thanks

like image 734
Clement Avatar asked Mar 13 '12 05:03

Clement


2 Answers

The easiest way to do this is by wrapping your Observable in Observable.Create:

IObservable<string> myObs;

var returnObservable = Observable.Create<string>(subj => {

    // TODO: Write code to do stuff on Sub

    var disp = myObs.Subscribe(subj);

    return Disposable.Create(() => {
        disp.Dispose();

        // TODO: Write code to do stuff in unsub.
    });
});
like image 60
Ana Betts Avatar answered Oct 24 '22 00:10

Ana Betts


Thanks that was what I needed. This can be turned into an operator as follows:

public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe, Action onDispose)
{
    return
        Observable
            .Create<TSource>(observer =>
            {
                onSubscribe?.Invoke();
                var subscription = source.Subscribe(observer);
                return () =>
                {
                    subscription.Dispose();
                    onDispose?.Invoke();
                };
            });
}
like image 22
Clement Avatar answered Oct 24 '22 00:10

Clement