Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is RefCount not working after all initial subscribers disconnect?

Consider the following:

[Fact]
public void foo()
{
    var result = new Subject<bool>();
    var startCount = 0;
    var completionCount = 0;
    var obs = Observable
        .Defer(() =>
            {
                ++startCount;
                return result.FirstAsync();
            })
        .Do(_ => ++completionCount)
        .Publish()
        .RefCount();

    // pretend there are lots of subscribers at once
    var s1 = obs.Subscribe();
    var s2 = obs.Subscribe();
    var s3 = obs.Subscribe();

    // even so, we only expect to be started once
    Assert.Equal(1, startCount);
    Assert.Equal(0, completionCount);

    // and we won't complete until the result ticks through
    result.OnNext(true);
    Assert.Equal(1, startCount);
    Assert.Equal(1, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();

    // now try exactly the same thing again
    s1 = obs.Subscribe();
    s2 = obs.Subscribe();
    s3 = obs.Subscribe();

    // startCount is 4 here instead of the expected 2!
    Assert.Equal(2, startCount);
    Assert.Equal(1, completionCount);

    result.OnNext(true);
    Assert.Equal(2, startCount);
    Assert.Equal(2, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();
}

My understanding of Publish + RefCount is that a connection to the source is maintained as long as there is at least one subscriber. Once the last subscriber disconnects, any future subscriber will re-initiate the connection to the source.

As you can see in my test, everything works perfectly the first time through. But the second time, the deferred observable inside the pipeline is executed once for every new subscriber.

I can see via the debugger that for the first group of subscribers, obs._count (which counts subscribers) increases for each call to Subscribe. But for the second group of subscribers, it remains zero.

Why is this happening and what can I do to rectify my pipeline?

like image 902
Kent Boogaart Avatar asked Feb 29 '16 05:02

Kent Boogaart


2 Answers

The answer from @user631090 is close, but incorrect, so I thought I'd answer myself.

It's because Publish will immediately complete new subscribers if the stream it published has itself completed. You can kind of see that in the diagram here:

enter image description here

But it would have been nice if the diagram included a subscriber after the underlying stream completes.

To add to the confusion, Defer is still called for new subscribers. But its return value is simply ignored by Publish because of the initial stream completing.

I'm as yet unable to come up with a way to implement my intended use case. I thought perhaps using Multicast rather than Publish, creating a new subject as necessary. But I haven't been able to achieve that yet. And it seems rather painful for what I would think is a common use case.

like image 172
Kent Boogaart Avatar answered Nov 13 '22 05:11

Kent Boogaart


It's because the underlying observable result has already completed. So each new subscriber is just getting the OnCompleted callback.

If ObservableDefer was creating a new sequence each time or one that didn't complete you would see the desired behavior.

e.g.

return result.FirstAsync().Concat(Observable.Never<bool>());

You will need to remove the Assert.Equal(1, completionCount);

like image 1
user630190 Avatar answered Nov 13 '22 04:11

user630190