Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Composing Async Observables that have dependencies using RxJava

I am new to reactive programming and confused about composing observables that have dependencies. Here is the scenario: There are two observables A, B. Observable A depends on a value emitted by B. (Therefore A needs to observe B). Is there a way to create an Observable C that composes A and B, and emits V? I am just looking for pointers in the RxJava documentation.

like image 696
user3398673 Avatar asked Mar 09 '14 15:03

user3398673


1 Answers

You question is a bit vague on how A depends on B so I'll try to give a several examples of how to combine observables.

Example - A cannot be created without B - Use map()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final B b;
    public A(B b) {
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .map(new Func1<B, A>() {
                @Override
                public A call(B b) {
                    return new A(b);
                }
            });
}

Example - Each occurrence of B can create zero or more A - Use flatMap()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .flatMap(new Func1<B, Observable<? extends A>>() {
                @Override
                public Observable<? extends A> call(final B b) {
                    return Observable.create(new Observable.OnSubscribe<A>() {
                        @Override
                        public void call(Subscriber<? super A> subscriber) {
                            for (int i = 0; i < b.value; i++) {
                                subscriber.onNext(new A(i));
                            }
                            subscriber.onCompleted();
                        }
                    });
                }
            });
}

I'm not exactly sure what you are asking with Observables C and V so let's look at a few more ways to combine observables.

Example - Combine each pair of items emitted by two observables - Use zip()

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class C {
    private final A a;
    private final B b;
    public C(A a, B b) {
        this.a = a;
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return Observable.from(new A(0), new A(1), new A(2), new A(3));
}

public Observable<C> createObservableC() {
    return Observable.zip(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}

Example - Combine the last item of two Observables - Use combineLatest()

// Use the same class definitions from previous example.
public Observable<C> createObservableC1() {
    return Observable.combineLatest(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}
like image 82
kjones Avatar answered Oct 11 '22 23:10

kjones