Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use cases for RxJava schedulers

People also ask

What are schedulers in RxJava?

Android Scheduler — This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

When should I use RxJava?

RxJava provides a standard workflow that is used to manage all data and events across the application like Create an Observable> Give the Observable some data to emit> Create an Observer> Subscribe the Observer to the Observable. RxJava is becoming more and more popular particularly for Android developers.

What is the difference between RxJava and RxAndroid?

RxAndroid is an extension of RxJava for Android which is used only in Android application. RxAndroid introduced the Main Thread required for Android. To work with the multithreading in Android, we will need the Looper and Handler for Main Thread execution. RxAndroid provides AndroidSchedulers.

What is subscribeOn in RxJava?

RxJava makes it easy. subscribeOn() operator tells the source Observable which thread to emit and push items on all the way down to Observer (hence, it affects both upstream and downstream operators). It does not matter where you put the subscribeOn() in your Observable chain of operators.


Great questions, I think the documentation could do with some more detail.

  1. io() is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
  2. computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.
  3. It's best to leave computation() for CPU intensive work only otherwise you won't get good CPU utilization.
  4. It's bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.

The most important point is that both Schedulers.io and Schedulers.computation are backed by unbounded thread pools as opposed to the others mentioned in the question. This characteristic is only shared by the Schedulers.from(Executor) in the case the Executor is created with newCachedThreadPool (unbounded with an auto-reclaim thread pool).

As abundantly explained in previous responses and multiple articles on the web, Schedulers.io and Schedulers.computation shall be used carefully as they are optimized for the type of work in their name. But, to my point of view, they're most important role is to provide real concurrency to reactive streams.

Contrary to newcomers belief, reactive streams are not inherently concurrent but inherently asynchronous and sequential. For this very reason, Schedulers.io shall be used only when the I/O operation is blocking (eg: using a blocking command such as Apache IOUtils FileUtils.readFileAsString(...)) thus would freeze the calling thread until the operation is done.

Using an asynchronous method such as Java AsynchronousFileChannel(...) wouldn't block the calling thread during the operation so there is no point in using a separate thread. In fact, Schedulers.io threads are not really a good fit for asynchronous operations as they don't run an event loop and the callback would never... be called.

The same logic applies for database access or remote API calls. Don't use the Schedulers.io if you can use an asynchronous or reactive API to make the call.

Back to concurrency. You may not have access to an async or reactive API to do I/O operations asynchronously or concurrently, so your only alternative is to dispatch multiple calls on a separate thread. Alas, Reactive streams are sequential at their ends but the good news is that the flatMap() operator can introduce concurrency at their core.

Concurrency must be built in the stream construct, typically using the flatMap() operator. This powerful operator can be configured to internally provide a multi-threaded context to your flatMap() embedded Function<T, R>. That context is provided by a multi-threaded Scheduler such as Scheduler.io or Scheduler.computation.

Find more details in articles on RxJava2 Schedulers and Concurrency where you'll find code sample and detailed explanations on how to use Schedulers sequentially and concurrently.

Hope this helps,

Softjake


This blog post provides an excellent answer

From the blog post:

Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.

Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.

Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.

Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.

Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.

Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.

Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.