Given:
Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);
How to get an Observable that contains 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17
?
You can merge, sort and flatten the sequences, but it will have a significant overhead:
o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)
or
o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)
Otherwise, you need to write a fairly complicated operator.
Edit 04/06/2015:
Here is an operator that does this sorted-merge more efficiently.
Edit: Please see the_joric's comment if you're going to use this. There is an edge case that isn't handled, I don't see a quick way to fix it, and so I don't have time to fix it right now.
Here's a solution in C#, since you have the system.reactive
tag.
static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
var source = Observable.Merge(
a.Select(x => Tuple.Create('a', x)),
b.Select(y => Tuple.Create('b', y)));
return source.Publish(o =>
{
var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
return Observable.Merge(
published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
});
}
The idea is summarized as follows.
When a
emits the value x
, we delay it until b
emits a value y
such that x <= y
.
When b
emits the value y
, we delay it until a
emits a value x
such that y <= x
.
If you only had hot observables, you could do the following. But the following would not work if there were any cold observables in the mix. I would advise always using the version that works for both hot and cold observables.
static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
return Observable.Merge(
a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With