I have this code:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(
I've solved my problem using OnError(new OperationCanceledException()), but I'd like a better solution (there has to be a combinator right?).
Or this, which is also quite neat:
public static class Ext
{
public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
{
return Observable.CreateWithDisposable<T>(obs =>
{
var compositeDisposable = new CompositeDisposable();
var subject = new Subject<T>();
compositeDisposable.Add(subject.Subscribe(obs));
compositeDisposable.Add(source.Subscribe(subject));
compositeDisposable.Add(right.Subscribe(subject));
return compositeDisposable;
});
}
}
This uses a subject which will make sure only one OnCompleted is pushed to the observer in the CreateWithDisposable();
Instead of re-writing Merge to finish when either stream completes I would suggest converting the onCompleted events to onNext events and using var ss = s1.Merge(s2).TakeUntil(s1ors2complete)
where s1ors2complete produces a value when either s1 or s2 ends. You could also just chain .TakeUntil(s1completes).TakeUntil(s2completes)
instead of creating s1ors2complete. This approach provides better composition than a MergeWithCompleteOnEither extension as it can be used to modify any "complete when both complete" operator into a "complete when any completes" operator.
As for how to convert onNext events to onCompleted events, there are a few ways to do that. The CompositeDisposable method sounds like a good approach, and a bit of searching finds this interesting thread about converting between onNext, onError, and onCompleted notifications. I'd probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True))
and your merge then becomes:
var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));
You could also look at using an operator like Zip which automatically completes when one of the input streams completes.
Assuming you don't need the output of either of the streams, you can use Amb
combined with some magic from Materialize
:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = Observable.Amb(
s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted),
s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
)
.Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from
If you need the values, you can use Do
on the two subjects.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With