Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Switching threads multiple times in Rx chain

Let's assume I have a following case for Android:

  1. Request list of groups from network
  2. Show some UI elements for each group
  3. Request items for each group
  4. Show UI elemets for each item

I want to do this using RxJava:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));

As you can see I have 2 calls for view objects, each of them must be executed on main thread. And 2 calls for webService, which must be executed on background thread.

The problem with this code: first call to view will be executed on background which cause an Android RuntimeException (Only original thread may touch views or something) If I transfer .observeOn to the beginning of chain - second webService call will be executed in main thread.

How can I "swim" through threads multiple times in RxJava chain?

like image 277
Евгений Кравцов Avatar asked Sep 06 '17 08:09

Евгений Кравцов


People also ask

Is RxJava multithreaded?

RxJava: Multi-Threading in Android.

How does RxJava chain work?

subscribeOn is executed during the subscription process and affects both upper, and lower streams. It also can change the thread as many times as you write it. The one closest to the top of the chain is applied. observeOn affects only the lower streams, and is executed during emission.

What is SubscribeOn in RxJava?

SubscribeOn specify the Scheduler on which an Observable will operate. ObserveOn specify the Scheduler on which an observer will observe this Observable.

How does RxJava work internally?

RxJava is a Java VM implementation of ReactiveX a library for composing asynchronous and event-based programs by using observable sequences. The building blocks of RxJava are Observables and Subscribers. Observable is used for emitting items and Subscriber is used for consuming those items.


1 Answers

Building on Samuel's answer, you could do it with an even simpler, non-nested syntax:

webService.requestGroups()
.subscribeOn(Schedulers.io()) // the first operator (requestGroups) on the IO thread
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.map(group -> {
    view.showGroup(group);
    return group;
})
.observeOn(Schedulers.io()) //everything below on the IO thread
.flatMap(group -> {
    return webService.requestItems(group);
})
.toList()
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.subscribe(items -> view.showItems(items));

Two rules of thumb here:

  1. subscribeOn dictates on which thread the observable will begin executing, its placement in the chain is irrelevant and it should appear only once.
  2. observeOn tells on which thread all subsequent operators will execute (until another observeOn is encountered); it may appear multiple times in the chain, changing execution thread of different code pieces (like in the example above).
like image 122
javaxian Avatar answered Nov 15 '22 20:11

javaxian