I would like my Observable
to block until the operation is complete and then continue with the next method calls etc. Take a look at this code:
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
Observable observer1 = Observable.just(1, 2, 3)
.observeOn(AndroidSchedulers.mainThread());
Observable observer2 = observer1.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer myint) {
//multiples each int by 2
return myint * 2;
}
});
observer2.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread());
observer2.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
System.out.println("I want this statement to come after multiplication completes");
I realize I can use the onComplete
call back but that's not my point. I am trying to figure out how I can block an observer until it completes and then continue with the rest of my code. At this moment the log looks like this:
I/System.out﹕ I want this statement to come after multiplication completes
I/System.out﹕ this is the Integer multiplied by two:2
I/System.out﹕ this is the Integer multiplied by two:4
I/System.out﹕ this is the Integer multiplied by two:6
Also notice how I am observing and subscribing all on the MainThread, is this done by default if I don't specify?
BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).
create(.. target) where you could likely have your listen() implementation to call the target's onnext/onerror/oncomplete. Of course, there is much code to add for when the subscriber unsubscribes (if that) so that the listeners can be removed. But that's a start.
Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.
onNext(): This method is called when a new item is emitted from the Observable. onError(): This method is called when an error occurs and the emission of data is not successfully completed. onComplete(): This method is called when the Observable has successfully completed emitting all items.
If you want to block until an Observable completes use observable.toBlocking().forEach()
instead of subscribe()
.
observer2
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
There are a number of Blocking Observable Operators that can be used in addition to forEach()
to obtain the desired effect. For example, if you only need the first item emitted then use observable.toBlocking().first()
Also, note that the RxJava API returns a new Observable for each of the calls you are making. Thus, the following line has no effect on the schedulers that observable2 uses.
observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());
It does create a new Observable with the specified schedulers but throws it away since the returned Observable is not assigned to any variable. You could do the following instead.
observer2
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.toBlocking()
.forEach(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("this is the Integer multiplied by two:" + i);
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With