Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count of observers in a Rx Subject

With Rx, what is the best way to get the number of current observers in a Subject?

I have a scenario where I want to publish a message, but only if there are observers. If there are no observers, I need to do something else.

To get around this issue, what I've done is created my own ISubject implementation and expose a count of an internal IObserver collection. I'm sure there must be an out of the box way of doing this, I'm just not fully familiar with what Rx has to offer.

Thanks!

like image 451
bedo Avatar asked Nov 29 '12 21:11

bedo


3 Answers

Use the Subject<T>.HasObservers property.

Source Code

I don't recall exactly when it was introduced, but I'm pretty sure that it wasn't always there. It was probably added in Rx 2.0.

like image 87
Dave Sexton Avatar answered Oct 03 '22 17:10

Dave Sexton


You should avoid implementing your own observable (or subject) implementations whenever possible.

You could certainly try writing a wrapper class to help.

Try this:

public class Countable
{
    private int _count;
    public int Count { get { return _count; } }
    public IObservable<T> GetCountable<T>(IObservable<T> source)
    {
        return Observable.Create<T>(o =>
        {
            Interlocked.Increment(ref _count);
            var subscription = source.Subscribe(o);
            var decrement = Disposable.Create(() =>
            {
                Interlocked.Decrement(ref _count);
            });
            return new CompositeDisposable(subscription, decrement);
        });
    }
}

You can then write code like this:

var xs = new Subject<int>();
var countable = new Countable();
var ys = countable.GetCountable(xs);
Console.WriteLine(countable.Count);
var s1 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
var s2 = ys.Subscribe(y => { });
Console.WriteLine(countable.Count);
s1.Dispose();
Console.WriteLine(countable.Count);
s2.Dispose();
Console.WriteLine(countable.Count);

My results running this are:

0
1
2
1
0
like image 23
Enigmativity Avatar answered Oct 03 '22 18:10

Enigmativity


use subject.observers.length, example:

import {Subject} from 'rxjs'

let subject = new Subject()
let s1 = subject.subscribe(v => console.log('observerA: ' + v))

subject.next(1) // observerA: 1
console.log(subject.observers.length) // 1


let s2 = subject.subscribe(v => {
    console.log('observerB: ' + v)
    if(v===3) s2.unsubscribe()
})

subject.next(2) // observerA: 2
console.log(subject.observers.length) // 2

subject.next(3) // observerA: 3
console.log(subject.observers.length) // 1
like image 29
elixiao Avatar answered Oct 03 '22 17:10

elixiao