I am new to reactive programming and confused about composing observables that have dependencies. Here is the scenario: There are two observables A, B. Observable A depends on a value emitted by B. (Therefore A needs to observe B). Is there a way to create an Observable C that composes A and B, and emits V? I am just looking for pointers in the RxJava documentation.
You question is a bit vague on how A depends on B so I'll try to give a several examples of how to combine observables.
Example - A cannot be created without B - Use map()
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class A {
public final B b;
public A(B b) {
this.b = b;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return createObservableB()
.map(new Func1<B, A>() {
@Override
public A call(B b) {
return new A(b);
}
});
}
Example - Each occurrence of B can create zero or more A - Use flatMap()
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class A {
public final int value;
public A(int value) {
this.value = value;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return createObservableB()
.flatMap(new Func1<B, Observable<? extends A>>() {
@Override
public Observable<? extends A> call(final B b) {
return Observable.create(new Observable.OnSubscribe<A>() {
@Override
public void call(Subscriber<? super A> subscriber) {
for (int i = 0; i < b.value; i++) {
subscriber.onNext(new A(i));
}
subscriber.onCompleted();
}
});
}
});
}
I'm not exactly sure what you are asking with Observables C and V so let's look at a few more ways to combine observables.
Example - Combine each pair of items emitted by two observables - Use zip()
public class A {
public final int value;
public A(int value) {
this.value = value;
}
}
public class B {
public final int value;
public B(int value) {
this.value = value;
}
}
public class C {
private final A a;
private final B b;
public C(A a, B b) {
this.a = a;
this.b = b;
}
}
public Observable<B> createObservableB() {
return Observable.from(new B(0), new B(1), new B(2), new B(3));
}
public Observable<A> createObservableA() {
return Observable.from(new A(0), new A(1), new A(2), new A(3));
}
public Observable<C> createObservableC() {
return Observable.zip(createObservableA(), createObservableB(),
new Func2<A, B, C>() {
@Override
public C call(A a, B b) {
return new C(a, b);
}
}
);
}
Example - Combine the last item of two Observables - Use combineLatest()
// Use the same class definitions from previous example.
public Observable<C> createObservableC1() {
return Observable.combineLatest(createObservableA(), createObservableB(),
new Func2<A, B, C>() {
@Override
public C call(A a, B b) {
return new C(a, b);
}
}
);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With