Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava - How to set Observer to block

I would like my Observable to block until the operation is complete and then continue with the next method calls etc. Take a look at this code:

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;

Observable observer1 = Observable.just(1, 2, 3)
        .observeOn(AndroidSchedulers.mainThread());

Observable observer2 = observer1.map(new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer myint) {
        //multiples each int by 2
        return myint * 2;
    }
});

observer2.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.mainThread());

observer2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer i) {
        System.out.println("this is the Integer multiplied by two:" + i);
    }
});

System.out.println("I want this statement to come after multiplication completes");

I realize I can use the onComplete call back but that's not my point. I am trying to figure out how I can block an observer until it completes and then continue with the rest of my code. At this moment the log looks like this:

I/System.out﹕ I want this statement to come after multiplication completes
I/System.out﹕ this is the Integer multiplied by two:2
I/System.out﹕ this is the Integer multiplied by two:4
I/System.out﹕ this is the Integer multiplied by two:6

Also notice how I am observing and subscribing all on the MainThread, is this done by default if I don't specify?

like image 582
j2emanue Avatar asked Jan 22 '15 05:01

j2emanue


People also ask

How do you block Observable?

BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).

How do you make an observer in RxJava?

create(.. target) where you could likely have your listen() implementation to call the target's onnext/onerror/oncomplete. Of course, there is much code to add for when the subscriber unsubscribes (if that) so that the listeners can be removed. But that's a start.

Is RxJava blocking?

Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.

What is onNext in RxJava?

onNext(): This method is called when a new item is emitted from the Observable. onError(): This method is called when an error occurs and the emission of data is not successfully completed. onComplete(): This method is called when the Observable has successfully completed emitting all items.


1 Answers

If you want to block until an Observable completes use observable.toBlocking().forEach() instead of subscribe().

observer2
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });

There are a number of Blocking Observable Operators that can be used in addition to forEach() to obtain the desired effect. For example, if you only need the first item emitted then use observable.toBlocking().first()

Also, note that the RxJava API returns a new Observable for each of the calls you are making. Thus, the following line has no effect on the schedulers that observable2 uses.

observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());

It does create a new Observable with the specified schedulers but throws it away since the returned Observable is not assigned to any variable. You could do the following instead.

observer2
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(AndroidSchedulers.mainThread())
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });
like image 152
kjones Avatar answered Oct 20 '22 15:10

kjones