I've got a simple program here that displays the number of letters in various words. It works as expected.
static void Main(string[] args) {
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Console.ReadLine();
}
And the output is:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
I used the Publish().RefCount() because "wordpub" is included in "report" twice. Without it, when a word is emitted first one part of the report would get notified by a callback, and then the other part of report would be notified, double the notifications. That is kindof what happens; the output ends up having 11 items rather than 6. At least that is what I think is going on. I think of using Publish().RefCount() in this situation as simultaneously updating both parts of the report.
However if I change the length function to ALSO use the published source like this:
var length = wordPub.Select(i => i.Length);
Then the output is this:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
Why can't the length function also use the same published source?
This was a great challenge to solve! So subtle the conditions that this happens. Apologies in advance for the long explanation, but bear with me!
TL;DR
Subscriptions to the published source are processed in order, but before any other subscription directly to the unpublished source. i.e. you can jump the queue!
With GroupJoin
subscription order is important to determine when windows open and close.
My first concern would be that you are publish refcounting a subject.
This should be a no-op.
Subject<T>
has no subscription cost.
So when you remove the Publish().RefCount()
:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
then you get the same issue.
So then I look to the GroupJoin
(because my intuition suggests that Publish().Refcount()
is a red herring).
For me, eyeballing this alone was too hard to rationalise, so I lean on a simple debugging too I have used dozens of times of the years - a Trace
or Log
extension method.
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
When I add the logging to your provided code it looks like this:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
This will then output in my log something like the following
Log with Publish().RefCount() used
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
However when I remove the usage Publish().RefCount()
the new log output is as follows:
Log without only Subject
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
This gives us some insight, however when the issue really becomes clear is when we start annotating our logs with a logical list of subscriptions.
In the original (working) code with the RefCount our annotations might look like this
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
So in this example, when word.OnNext("Banana");
is executed the chain of observers is linked in this order
However, wordPub has child subscriptions! So the real subscription list looks like
If we annotate the Subject only log we see where the subtlety lies
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
So in this example, when word.OnNext("Banana");
is executed the chain of observers is linked in this order
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
As the 0003lhsDuration
subscription is activated after the 0002rhs
, it wont see the "Banana" value to terminate the window, until after the rhs has been sent the value, thus yielding it in the still open window.
Whew
As @francezu13k50 points out the obvious and simple solution to your problem is to just use word.Select(x => new { Word = x, Length = x.Length });
, but as I think you have given us a simplified version of your real problem (appreciated) I understand why this isn't suitable.
However, as I dont know what your real problem space is I am not sure what to suggest to you to provide a solution, except that you have one with your current code, and now you should know why it works the way it does.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With