I have an observable which represents a stream of stock prices. If there are no observers on my observable sequence I'd like to be able to disconnect from the remote server that is supplying the stream of prices, but I don't want to do that until every observer has called Dispose(). Then in a similar fashion, when the first person calls Subscribe I'd like to reconnect to the remote server.
Is there a way to figure out how many observers have called subscribe on an observable? Or perhaps a way to know when observers are calling Subscribe or Dispose?
Observer : Any object that wishes to be notified when the state of another object changes. Observable : Any object whose state may be of interest, and in whom another object may register an interest.
In RxJS an Observer is simply a set of callbacks (next, error, complete). Here's a simple example of an observer. import { Observer } from "rxjs"; const observer: Observer<string> = { next: (value: string) => console. log(`[observer] next`, value), error: (error: Error) => console.
Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.
1. Complete Observer. This is a detached observer where the researcher is neither seen nor noticed by participants. It's one way of minimizing the Hawthorne Effect as participants are more likely to act natural when they don't know they're being observed.
I would simply use RefCount / Publish. I always feel like if I'm implementing IObservable I'm working way too hard.
myColdObservable.Publish().RefCount();
This will make your observable stop pulsing after everyone has disconnected. Here's a sample:
var coldObservable = Observable
.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(Scheduler.TaskPool)
.Select(_ => DoSomething());
var refCountObs = coldObservable.Publish().RefCount();
CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));
//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);
//Everyone unsubscribes
d.Dispose();
//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);
This does not cover the case where you actually want to know the number of subscribers, but I think this fits with your requirements of stopping work if there are no subscribers.
Bit of an old one but I came across this post as I had a problem where I needed to know the number of subscribers. Using Bart's suggestion I came up with this extension.
public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
int count = 0;
return Observable.Defer(() =>
{
count = Interlocked.Increment(ref count);
countChanged(count);
return source.Finally(() =>
{
count = Interlocked.Decrement(ref count);
countChanged(count);
});
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With