Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: combine two optional observables

I have two Observables, let's call them PeanutButter and Jelly. I'd like to combine them to a Sandwich Observable. I can do that using:

Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
    peanutButterObservable,
    jellyObservable,
    (pb, j) -> makeSandwich(pb, j))

The problem is that RX waits for the first PeanutButter and the first Jelly to be emitted before emitting the first combined Sandwich but Jelly may never be emitted which means I never get the first Sandwich.

I'd like to combine the two feeds such that a combined item is emitted as soon as the first item from either feed is emitted, regardless of whether the other feed has yet to emit anything, how do I do that in RxJava?

like image 207
Warlax Avatar asked Dec 21 '17 19:12

Warlax


People also ask

How do you merge Observables?

The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.

How do you run two Observables parallel?

To achieve this is you can do this: Observable. just(dataRequestOne, dataRequestTwo). flatMap(new Func1<Data, Observable<Data>>() { @Override public Observable<Data> call(Data data) { return Observable.

What is flatMapCompletable?

This is what flatMapCompletable does: Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.


Video Answer


2 Answers

one possible approach would be to use the startWith operator to trigger an emission of a known value from each stream upon subscription. this way combineLatest() will trigger if either stream emits a value. you'd just have to be mindful of looking out for the initial/signal values in the onNext consumer.

something like this...:

@Test
public void sandwiches() {
    final Observable<String> peanutButters = Observable.just("chunky", "smooth")
        .startWith("--initial--");

    final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
        .startWith("--initial--");

    Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
        return new Pair<>(peanutButter, jelly);
    })
    .subscribe(
        next -> {
            final String peanutButter = next.getFirst();
            final String jelly = next.getSecond();

            if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
                // initial emissions
            } else if(peanutButter.equals("--initial--")) {
                // jelly emission
            } else if(jelly.equals("--initial--")) {
                // peanut butter emission
            } else {
                // peanut butter + jelly emissions
            }
        },
        error -> {
            System.err.println("## onError(" + error.getMessage() + ")");
        },
        () -> {
            System.out.println("## onComplete()");
        }
    );
}
like image 89
homerman Avatar answered Oct 27 '22 10:10

homerman


I think this problem can be approached by using merge and scan operators:

public class RxJavaUnitTestJava {
    public Observable<Sandwich> getSandwich(Observable<Jelly> jelly, Observable<PeanutButter> peanutButter) {
        return Observable.merge(jelly, peanutButter)
                .scan(new Sandwich(null, null), (BiFunction<Object, Object, Object>) (prevResult, newItem) -> {
                    Sandwich prevSandwich = (Sandwich) prevResult;

                    if (newItem instanceof Jelly) {
                        System.out.println("emitted: " + ((Jelly) newItem).tag);
                        return new Sandwich((Jelly) newItem, prevSandwich.peanutButter);
                    } else {
                        System.out.println("emitted: " + ((PeanutButter) newItem).tag);
                        return new Sandwich(prevSandwich.jelly, (PeanutButter) newItem);
                    }
                })
                .skip(1) // skip emitting scan's default item
                .cast(Sandwich.class);
    }

    @Test
    public void testGetSandwich() {
        PublishSubject<Jelly> jelly = PublishSubject.create();
        PublishSubject<PeanutButter> peanutButter = PublishSubject.create();

        getSandwich(jelly, peanutButter).subscribe(new Observer<Sandwich>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Sandwich sandwich) {
                System.out.println("onNext: Sandwich: " + sandwich.toString());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

        jelly.onNext(new Jelly("jelly1"));
        jelly.onNext(new Jelly("jelly2"));
        peanutButter.onNext(new PeanutButter("peanutButter1"));
        jelly.onNext(new Jelly("jelly3"));
        peanutButter.onNext(new PeanutButter("peanutButter2"));
    }

    class Jelly {
        String tag;

        public Jelly(String tag) { 
            this.tag = tag;
        }
    }

    class PeanutButter {
        String tag;

        public PeanutButter(String tag) { 
            this.tag = tag; 
        }
    }

    class Sandwich {
        Jelly jelly;
        PeanutButter peanutButter;

        public Sandwich(Jelly jelly, PeanutButter peanutButter) {
            this.jelly = jelly;
            this.peanutButter = peanutButter;
        }

        @Override
        public String toString() {
            String jellyResult = (jelly != null) ? jelly.tag : "no jelly";
            String peanutButterResult = (peanutButter != null) ? peanutButter.tag : "no peanutButter";

            return jellyResult + " | " + peanutButterResult;
        }
    }
}

Output:

onSubscribe
emitted: jelly1
onNext: Sandwich: jelly1 | no peanutButter
emitted: jelly2
onNext: Sandwich: jelly2 | no peanutButter
emitted: peanutButter1
onNext: Sandwich: jelly2 | peanutButter1
emitted: jelly3
onNext: Sandwich: jelly3 | peanutButter1
emitted: peanutButter2
onNext: Sandwich: jelly3 | peanutButter2

The fact that Jelly, PeanutButter and Sandwich are all independent types makes it a bit more complex around casting and nullability in scan. If you have control over these types, this solution can be further improved.

like image 37
dkarmazi Avatar answered Oct 27 '22 09:10

dkarmazi