Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Scheduler to observe on main thread

If I write something like this, then both the operation and notification will be on the current thread...

Observable.fromCallable(() -> "Do Something")
    .subscribe(System.out::println);

If I do the operation on a background thread like this, then both the operation and notification will be on a background thread...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .subscribe(System.out::println);

If I want to observe on the main thread and do in the background in Android I would do...

Observable.fromCallable(() -> "Do Something")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(System.out::println);

But If I was writing a standard Java program, what is the equivalent to state that you want to observe on the main thread?

like image 381
Eurig Jones Avatar asked Jun 20 '17 16:06

Eurig Jones


People also ask

Is RxJava single threaded?

Rx is single-threaded by default. It implies that an Observable and the chain of operators that we can apply to it. That will notify its observers on the same thread on which its subscribe() method is called. The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests.

Is RxJava multithreaded?

By default, nothing in RxJava is multi-threaded. Multi-threading can easily be introduced, however, by using Schedulers. For example, if you did this: Observable.

What's the difference between observeOn () and subscribeOn ()?

observeOn() simply changes the thread of all operators further Downstream. People usually have this misconception that observeOn also acts as upstream, but it doesn't. subscribeOn() only influences the thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.


2 Answers

For RxJava2 use "blockingSubscribe()"

Flowable.fromArray(1, 2, 3)
                .subscribeOn(Schedulers.computation())
                .blockingSubscribe(integer -> {
                    System.out.println(Thread.currentThread().getName());
                });
like image 64
Dmitry Avatar answered Oct 16 '22 16:10

Dmitry


Convert the Observable to a BlockingObservable via .toBlocking(); this gives you blocking methods to wait for completion, get one item, etc.

like image 7
Tassos Bassoukos Avatar answered Oct 16 '22 16:10

Tassos Bassoukos