Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ObserveOn and SubscribeOn - where the work is being done

Based on reading this question: What's the difference between SubscribeOn and ObserveOn

ObserveOn sets where the code is in the Subscribe handler is is executed:

stream.Subscribe(_ => { // this code here });

The SubscribeOn method sets which thread the setup of the stream is done on.

I'm led to understand that if these aren't explicitly set, then the TaskPool is used.

Now my question is, lets say I do something like this:

Observable.Interval(new Timespan(0, 0, 1))           .Where(t => predicate(t))           .SelectMany(t => lots_of(t))           .ObserveOnDispatcher()           .Subscribe(t => some_action(t)); 

Where are the Where predicate and SelectMany lots_of being executed, given that some_action is being executed on the dispatcher?

like image 383
Cheetah Avatar asked Dec 08 '13 09:12

Cheetah


People also ask

What is observeOn and subscribeOn?

observeOn works only downstream. subscribeOn works downstream and upstream. consecutive subscribeOns do not change the thread. consequent observeOns do change the thread. Thread change by an observeOn cannot be overridden by a subscribeOn.

What is subscribeOn?

subscribeOn() operator tells the source Observable which thread to emit and push items on all the way down to Observer (hence, it affects both upstream and downstream operators). It does not matter where you put the subscribeOn() in your Observable chain of operators.

What is Schedulers in RxJava?

Android Scheduler — This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

What is observeOn in Rxjava?

ObserveOn specify the Scheduler on which an observer will observe this Observable. So basically SubscribeOn is mostly subscribed (executed) on a background thread ( you do not want to block the UI thread while waiting for the observable) and also in ObserveOn you want to observe the result on a main thread...


1 Answers

There's a lot of misleading info out there about SubscribeOn and ObserveOn.

Summary

  • SubscribeOn intercepts calls to the single method of IObservable<T>, which is Subscribe, and calls to Dispose on the IDisposable handle returned by Subscribe.
  • ObserveOn intercepts calls to the methods of IObserver<T>, which are OnNext, OnCompleted & OnError.
  • Both methods cause the respective calls to be made on the specified scheduler.

Analysis & Demonstrations

The statement

ObserveOn sets where the code in the Subscribe handler is executed:

is more confusing than helpful. What you are referring to as the "Subscribe handler" is really an OnNext handler. Remember, the Subscribe method of IObservable accepts an IObserver that has OnNext, OnCompleted and OnError methods, but it is extension methods that provide the convenience overloads that accept lambdas and build an IObserver implementation for you.

Let me appropriate the term though; I think of the "Subscribe handler" being the code in the observable that is invoked when Subscribe is called. In this way, the description above more closely resembles the purpose of SubscribeOn.

SubscribeOn

SubscribeOn causes the Subscribe method of an observable to be executed asynchronously on the specified scheduler or context. You use it when you don't want to call the Subscribe method on an observable from whatever thread you are running on - typically because it can be long-running and you don't want to block the calling thread.

When you call Subscribe, you are calling an observable that may be part of a long chain of observables. It's only the observable that SubscribeOn is applied to that it effects. Now it may be the case that all the observables in the chain will be subscribed to immediately and on the same thread - but it doesn't have to be the case. Think about Concat for example - that only subscribes to each successive stream once the preceding stream has finished, and typically this will take place on whatever thread the preceding stream called OnCompleted from.

So SubscribeOn sits between your call to Subscribe and the observable you are subscribing to, intercepting the call and making it asynchronous.

It also affects disposal of subscriptions. Subscribe returns an IDisposable handle which is used to unsubscribe. SubscribeOn ensures calls to Dispose are scheduled on the supplied scheduler.

A common point of confusion when trying to understand what SubscribeOn does is that the Subscribe handler of an observable may well call OnNext, OnCompleted or OnError on this same thread. However, its purpose is not to affect these calls. It's not uncommon for a stream to complete before the Subscribe method returns. Observable.Return does this, for example. Let's take a look.

If you use the Spy method I wrote, and run the following code:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

You get this output (thread id may vary of course):

Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

You can see that the entire subscription handler ran on the same thread, and finished before returning.

Let's use SubscribeOn to run this asynchronously. We will Spy on both the Return observable and the SubscribeOn observable:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

This outputs (line numbers added by me):

01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed. 

01 - The main method is running on thread 1.

02 - the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - the SubscribeOn observable is evaluated on the calling thread.

04 - Now finally we call the Subscribe method of SubscribeOn.

05 - The Subscribe method completes asynchronously...

06 - ... and thread 1 returns to the main method. This is the effect of SubscribeOn in action!

07 - Meanwhile, SubscribeOn scheduled a call on the default scheduler to Return. Here it is received on thread 2.

08 - And as Return does, it calls OnNext on the Subscribe thread...

09 - and SubscribeOn is just a pass through now.

10,11 - Same for OnCompleted

12 - And last of all the Return subscription handler is done.

Hopefully that clears up the purpose and effect of SubscribeOn!

ObserveOn

If you think of SubscribeOn as an interceptor for the Subscribe method that passes the call on to a different thread, then ObserveOn does the same job, but for the OnNext, OnCompleted and OnError calls.

Recall our original example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

Which gave this output:

Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

Now lets alter this to use ObserveOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

We get the following output:

01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2 

01 - The main method is running on Thread 1.

02 - As before, the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - The ObserveOn observable is evaluated on the calling thread too.

04 - Now we subscribe, again on the calling thread, first to the ObserveOn observable...

05 - ... which then passes the call through to the Return observable.

06 - Now Return calls OnNext in its Subscribe handler.

07 - Here is the effect of ObserveOn. We can see that the OnNext is scheduled asynchronously on Thread 2.

08 - Meanwhile Return calls OnCompleted on Thread 1...

09 - And Return's subscription handler completes...

10 - and then so does ObserveOn's subscription handler...

11 - so control is returned to the main method

12 - Meanwhile, ObserveOn has shuttled Return's OnCompleted call this over to Thread 2. This could have happened at any time during 09-11 because it is running asynchronously. Just so happens it's finally called now.

What are the typical use cases?

You will most often see SubscribeOn used in a GUI when you need to Subscribe to a long running observable and want to get off the dispatcher thread as soon as possible - maybe because you know it's one of those observables that does all it's work in the subscription handler. Apply it at the end of the observable chain, because this is the first observable called when you subscribe.

You will most often see ObserveOn used in a GUI when you want to ensure OnNext, OnCompleted and OnError calls are marshalled back to the dispatcher thread. Apply it at the end of the observable chain to transition back as late as possible.

Hopefully you can see that the answer to your question is that ObserveOnDispatcher won't make any difference to the threads that Where and SelectMany are executed on - it all depends what thread stream is calling them from! stream's subscription handler will be invoked on the calling thread, but it's impossible to say where Where and SelectMany will run without knowing how stream is implemented.

Observables with lifetimes that outlive the Subscribe call

Up until now, we've been looking exclusively at Observable.Return. Return completes its stream within the Subscribe handler. That's not atypical, but it's equally common for streams to outlive the Subscribe handler. Look at Observable.Timer for example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

This returns the following:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 

You can clearly see the subscription to complete and then OnNext and OnCompleted being called later on a different thread.

Note that no combination of SubscribeOn or ObserveOn will have any effect whatsoever on which thread or scheduler Timer choses to invoke OnNext and OnCompleted on.

Sure, you can use SubscribeOn to determine the Subscribe thread:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

(I am deliberately changing to the NewThreadScheduler here to prevent confusion in the case of Timer happening to get the same thread pool thread as SubscribeOn)

Giving:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3 

Here you can clearly see the main thread on thread (1) returning after its Subscribe calls, but the Timer subscription getting its own thread (2), but the OnNext and OnCompleted calls running on thread (3).

Now for ObserveOn, let's change the code to (for those following along in code, use nuget package rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

This code is a little different. The first line ensures we have a dispatcher, and we also bring in ObserveOnDispatcher - this is just like ObserveOn, except it specifies we should use the DispatcherScheduler of whatever thread ObserveOnDispatcher is evaluated on.

This code gives the following output:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1 

Note that the dispatcher (and main thread) are thread 1. Timer is still calling OnNext and OnCompleted on the thread of its choosing (2) - but the ObserveOnDispatcher is marshalling calls back onto the dispatcher thread, thread (1).

Also note that if we were to block the dispatcher thread (say by a Thread.Sleep) you would see that the ObserveOnDispatcher would block (this code works best inside a LINQPad main method):

var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked"); 

And you'll see output like this:

Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1 

With the calls through the ObserveOnDispatcher only able to get out once the Sleep has run.

Key points

It's useful to keep in mind that Reactive Extensions is essentially a free-threaded library, and tries to be as lazy as possible about what thread it runs on - you have to deliberately interfere with ObserveOn, SubscribeOn and passing specific schedulers to operators that accept them to change this.

There's nothing a consumer of an observable can do to control what it's doing internally - ObserveOn and SubscribeOn are decorators that wrap the surface area of observers and observables to marshal calls across threads. Hopefully these examples have made that clear.

like image 66
James World Avatar answered Sep 19 '22 10:09

James World