Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava compare different items from two observables (java 7)

I have two Observables ob1 & ob2 that emit items . If ob1.item is contained in ob2 I want to ignore it.

Example :

ob1 contains A,B,C,D

ob2 contains E,C,B,G,J,O

output :

ob contains A,D

What is the best way to express this ?

like image 675
Younes Ouchala Avatar asked Apr 10 '17 16:04

Younes Ouchala


2 Answers

There is a contains operator which returns Observable<Boolean> for whether observable contains given element.

Observable<Boolean> contains = observable2.contains(string);

What you need is to map all elements from observable1 to those boolean observables

observable1.map(new Func1<String, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(final String string) {
        return observable2.contains(string);
    }
})

This would give you Observable<Observable<Boolean>>, but it would be hard to work with this. Therefore you concatenate all those boolean observables into one. Luckilly, all you need to do for that is to use concatMap instead of map

observable1.concatMap(new Func1<String, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(final String string) {
        return observable2.contains(string);
    }
})

Now you have an observable1 that contains your elements and an Observable<Boolean> that contains boolean for whether the element in observable1 is contained in observable2

observable1:            A ---- B ---- C ---- D
concatMapObservable:    false  true   true   false

You can easily zip those two observables into an observable which will pass the elements that are not contained in observable2 and replace others with an empty string

[concatMapObservable].zipWith(observable1, new Func2<Boolean, String, String>() {
    @Override
    public String call(final Boolean contains, final String string) {
        return contains ? "" : string;
    }
}

Then you filter the empty strings

[zippedObservable].filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(final String string) {
        return !TextUtils.isEmpty(string);
    }
})

The whole code put together:

Observable<String> observable1 = Observable.from(new String[]{"A", "B", "C", "D"});
Observable<String> observable2 = Observable.from(new String[]{"E", "C", "B", "G", "J", "O"});

observable1.concatMap(new Func1<String, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(final String string) {
        return observable2.contains(string);
    }
}).zipWith(observable1, new Func2<Boolean, String, String>() {
    @Override
    public String call(final Boolean contains, final String string) {
        return contains ? "" : string;
    }
}).filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(final String string) {
        return !TextUtils.isEmpty(string);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(final String string) {
        Log.d("observable:", string);
    }
});
like image 160
Lamorak Avatar answered Oct 02 '22 14:10

Lamorak


The output of your operation cannot be computed until all items of ob2 have been received, as you cannot tell whether an item in ob1 will be contained in ob2 until you know all items that are part of ob2. As a result, you have to wait for ob2 to complete before you can then filter items from ob1.

A possible solution using RxJava2 is below.

    Observable<String> ob1 = Observable.fromArray("A", "B", "C", "D");
    Observable<String> ob2 = Observable.fromArray("E", "C", "B", "G", "J", "O");

    Observable.combineLatest(ob2.toList().toObservable(), ob1, (list, value) -> list.contains(value) ? "" : value)
            .filter(value -> !TextUtils.isEmpty(value))
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(value -> Log.d("foo", "Value: " + value));
like image 23
Francesc Avatar answered Oct 02 '22 15:10

Francesc