Consider the following:
[Fact]
public void foo()
{
var result = new Subject<bool>();
var startCount = 0;
var completionCount = 0;
var obs = Observable
.Defer(() =>
{
++startCount;
return result.FirstAsync();
})
.Do(_ => ++completionCount)
.Publish()
.RefCount();
// pretend there are lots of subscribers at once
var s1 = obs.Subscribe();
var s2 = obs.Subscribe();
var s3 = obs.Subscribe();
// even so, we only expect to be started once
Assert.Equal(1, startCount);
Assert.Equal(0, completionCount);
// and we won't complete until the result ticks through
result.OnNext(true);
Assert.Equal(1, startCount);
Assert.Equal(1, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
// now try exactly the same thing again
s1 = obs.Subscribe();
s2 = obs.Subscribe();
s3 = obs.Subscribe();
// startCount is 4 here instead of the expected 2!
Assert.Equal(2, startCount);
Assert.Equal(1, completionCount);
result.OnNext(true);
Assert.Equal(2, startCount);
Assert.Equal(2, completionCount);
s1.Dispose();
s2.Dispose();
s3.Dispose();
}
My understanding of Publish
+ RefCount
is that a connection to the source is maintained as long as there is at least one subscriber. Once the last subscriber disconnects, any future subscriber will re-initiate the connection to the source.
As you can see in my test, everything works perfectly the first time through. But the second time, the deferred observable inside the pipeline is executed once for every new subscriber.
I can see via the debugger that for the first group of subscribers, obs._count
(which counts subscribers) increases for each call to Subscribe
. But for the second group of subscribers, it remains zero.
Why is this happening and what can I do to rectify my pipeline?
The answer from @user631090 is close, but incorrect, so I thought I'd answer myself.
It's because Publish
will immediately complete new subscribers if the stream it published has itself completed. You can kind of see that in the diagram here:
But it would have been nice if the diagram included a subscriber after the underlying stream completes.
To add to the confusion, Defer
is still called for new subscribers. But its return value is simply ignored by Publish
because of the initial stream completing.
I'm as yet unable to come up with a way to implement my intended use case. I thought perhaps using Multicast
rather than Publish
, creating a new subject as necessary. But I haven't been able to achieve that yet. And it seems rather painful for what I would think is a common use case.
It's because the underlying observable result has already completed. So each new subscriber is just getting the OnCompleted callback.
If ObservableDefer was creating a new sequence each time or one that didn't complete you would see the desired behavior.
e.g.
return result.FirstAsync().Concat(Observable.Never<bool>());
You will need to remove the Assert.Equal(1, completionCount);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With