Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Track the (number of) observers in an Observable?

I have an observable which represents a stream of stock prices. If there are no observers on my observable sequence I'd like to be able to disconnect from the remote server that is supplying the stream of prices, but I don't want to do that until every observer has called Dispose(). Then in a similar fashion, when the first person calls Subscribe I'd like to reconnect to the remote server.

Is there a way to figure out how many observers have called subscribe on an observable? Or perhaps a way to know when observers are calling Subscribe or Dispose?

like image 897
Jonathan Beerhalter Avatar asked May 31 '12 18:05

Jonathan Beerhalter


People also ask

What is an observer in Observable?

Observer : Any object that wishes to be notified when the state of another object changes. Observable : Any object whose state may be of interest, and in whom another object may register an interest.

What is observer RxJS?

In RxJS an Observer is simply a set of callbacks (next, error, complete). Here's a simple example of an observer. import { Observer } from "rxjs"; const observer: Observer<string> = { next: (value: string) => console. log(`[observer] next`, value), error: (error: Error) => console.

What are observers in Angular?

Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.

What is observer complete?

1. Complete Observer. This is a detached observer where the researcher is neither seen nor noticed by participants. It's one way of minimizing the Hawthorne Effect as participants are more likely to act natural when they don't know they're being observed.


2 Answers

I would simply use RefCount / Publish. I always feel like if I'm implementing IObservable I'm working way too hard.

myColdObservable.Publish().RefCount();

This will make your observable stop pulsing after everyone has disconnected. Here's a sample:

var coldObservable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.TaskPool)
    .Select(_ => DoSomething());

var refCountObs = coldObservable.Publish().RefCount();

CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));

//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);

//Everyone unsubscribes
d.Dispose();

//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);

This does not cover the case where you actually want to know the number of subscribers, but I think this fits with your requirements of stopping work if there are no subscribers.

like image 194
Anderson Imes Avatar answered Sep 27 '22 22:09

Anderson Imes


Bit of an old one but I came across this post as I had a problem where I needed to know the number of subscribers. Using Bart's suggestion I came up with this extension.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
 int count = 0;

 return Observable.Defer(() =>
 {
    count = Interlocked.Increment(ref count);
    countChanged(count);
    return source.Finally(() =>
     {
        count = Interlocked.Decrement(ref count);
        countChanged(count);
     });
 });
}
like image 40
Roland Pheasant Avatar answered Sep 28 '22 00:09

Roland Pheasant