Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Schedulers: Immediate vs. CurrentThread

After reading the explanation for why

Observable.Return(5)
  .Repeat()
  .Take(1)

never completes, but

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

works as expected. I am still confused and I can't tell why CurrentThread actually solves the problem. Can somebody give a clear explanation?

like image 739
Sergi Mansilla Avatar asked Jun 23 '15 23:06

Sergi Mansilla


People also ask

What is RX scheduler?

Android Scheduler — This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

What is reactor Scheduler?

Reactive Streams provide a standard for asynchronous stream processing. We achieve asynchronous/non-blocking behavior by scheduling tasks on worker threads. Creating and managing threads ourselves is not an easy task.


1 Answers

The link provided by Ned Stoyanov in the comments above has a great explanation by Dave Sexton.

I'll try to illustrate it a bit differently. Take this example where a recursive call occurs in the RecursiveMethod.

public class RecursiveTest()
{
    private bool _isDone;

    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        }
    }  
}

You can easily see that this will recurse indefinitely (until a StackOverflowException) because _isDone will never gets set to true. It is an overly simplified example, but it is basically what's going on with your first example.

This is the explanation by Dave Sexton to describe what happens in your first example.

By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return. Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely. Calling Subscribe on this observable never returns.

In other words, because of the infinite loop of reentrancy, the initial flow never gets fully completed. So we need a way to complete the initial flow without the reentrancy.

Let's go back to my RecursiveTest example above in this post, what would be the solution to avoid infinite recursion? We would need the RecursiveMethod to complete its flow before executing again the RecursiveMethod. One way to do this is to have a queue and enqueue the call to the RecursiveMethod like this:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  

This way, the initial flow would complete, _isDone would be set to true and when the next call to RecursiveMethod is executed, nothing will get executed anymore avoiding the infinite recursion. And this is pretty much what the Scheduler.CurrentThread will do to your second example.

Let's see how Dave Sexton explains how your second example works:

Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately. This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

Again my example was really simplified to make it easy to understand and it's not exactly how it works. Here you can see how the scheduler really works. It uses what they call a Trampoline which is basically a queue that makes sure that there is no reentrant calls. All calls are therefore serialized one after the other on the same thread. By doing so, the initial flow can be completed which avoids the infinite reentrant loop.

Hope this is a bit clearer :)

like image 57
Absolom Avatar answered Oct 08 '22 03:10

Absolom