Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Merge two Observables so the result completes when the any of the Observables completes?

I have this code:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

I've solved my problem using OnError(new OperationCanceledException()), but I'd like a better solution (there has to be a combinator right?).

like image 516
Pablo Montilla Avatar asked Feb 03 '11 15:02

Pablo Montilla


3 Answers

Or this, which is also quite neat:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

This uses a subject which will make sure only one OnCompleted is pushed to the observer in the CreateWithDisposable();

like image 69
Ray Booysen Avatar answered Oct 20 '22 18:10

Ray Booysen


Instead of re-writing Merge to finish when either stream completes I would suggest converting the onCompleted events to onNext events and using var ss = s1.Merge(s2).TakeUntil(s1ors2complete) where s1ors2complete produces a value when either s1 or s2 ends. You could also just chain .TakeUntil(s1completes).TakeUntil(s2completes) instead of creating s1ors2complete. This approach provides better composition than a MergeWithCompleteOnEither extension as it can be used to modify any "complete when both complete" operator into a "complete when any completes" operator.

As for how to convert onNext events to onCompleted events, there are a few ways to do that. The CompositeDisposable method sounds like a good approach, and a bit of searching finds this interesting thread about converting between onNext, onError, and onCompleted notifications. I'd probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True)) and your merge then becomes:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

You could also look at using an operator like Zip which automatically completes when one of the input streams completes.

like image 27
Greg Bray Avatar answered Oct 20 '22 17:10

Greg Bray


Assuming you don't need the output of either of the streams, you can use Amb combined with some magic from Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

If you need the values, you can use Do on the two subjects.

like image 20
Richard Szalay Avatar answered Oct 20 '22 17:10

Richard Szalay