Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to merge two sorted Observables into a single sorted Observable?

Given:

Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);

How to get an Observable that contains 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?

like image 211
ZhekaKozlov Avatar asked Jun 03 '15 20:06

ZhekaKozlov


2 Answers

You can merge, sort and flatten the sequences, but it will have a significant overhead:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

or

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

Otherwise, you need to write a fairly complicated operator.

Edit 04/06/2015:

Here is an operator that does this sorted-merge more efficiently.

like image 80
akarnokd Avatar answered Oct 04 '22 11:10

akarnokd


Edit: Please see the_joric's comment if you're going to use this. There is an edge case that isn't handled, I don't see a quick way to fix it, and so I don't have time to fix it right now.

Here's a solution in C#, since you have the system.reactive tag.

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
    var source = Observable.Merge(
        a.Select(x => Tuple.Create('a', x)),
        b.Select(y => Tuple.Create('b', y)));
    return source.Publish(o =>
    {
        var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
        var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
        return Observable.Merge(
            published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
            published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
    });
}

The idea is summarized as follows.

  • When a emits the value x, we delay it until b emits a value y such that x <= y.

  • When b emits the value y, we delay it until a emits a value x such that y <= x.

If you only had hot observables, you could do the following. But the following would not work if there were any cold observables in the mix. I would advise always using the version that works for both hot and cold observables.

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
    return Observable.Merge(
        a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
        b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
like image 29
Timothy Shields Avatar answered Oct 04 '22 10:10

Timothy Shields