I have some trouble understandig how subscribeOn/observeOn works in RxJava. I've created simple app with observable that emits solar system planet names, does some mapping and filtering and prints results.
As I understand, scheduling work to background thread is done via subscribeOn
operator (and it seems to work fine).
Observing on background thread also works fine with observeOn
operator.
But I have trouble in understanding, how to observe on calling thread (either if it is main thread or any other). It is easily done on Android with AndroidSchedulers.mainThread()
operator, but I don't know how to achieve this in pure java.
Here's my code:
public class Main { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); System.out.println("Main thread: " + getCurrentThreadInfo()); Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton")) .map(in -> { System.out.println("map on: " + getCurrentThreadInfo()); return in.toUpperCase(); }) .filter(in -> { System.out.println("filter on: " + getCurrentThreadInfo()); return in.contains("A"); }) .subscribeOn(Schedulers.from(executor)); for (int i = 0; i < 5; i++) { Thread thread = new Thread("Thread-" + i) { @Override public void run() { stringObservable .buffer(5) .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo())); } }; thread.start(); } } private static String getCurrentThreadInfo() { return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")"; } }
Observable in created and work is subscribed on one of three thread from executor. This works as expected. But how to observe results on those dynamically created thread in for loop? Is there a way to create Scheduler from current thread?
Also, I've found out that after running this code, it never terminates and I don't know why? :(
To answer your question, let me start from beginning, this allows other people to understand what you already know.
Schedulers
Schedulers play the same role as Executors for Java. Briefly - they decide on which thread actions are executed.
Usually an Observable and operators execute in current thread. Sometimes you can pass Scheduler to Observable or operator as a parameter (e.g. Observable.timer()).
Additionally RxJava provides 2 operators to specify Scheduler:
To understand them quickly, I use a the example code:
On all samples, I will use helper createObservable, which emits a name of thread on which the Observable operates:
public static Observable<String> createObservable(){ return Observable.create((Subscriber<? super String> subscriber) -> { subscriber.onNext(Thread.currentThread().getName()); subscriber.onCompleted(); } ); }
Without schedulers:
createObservable().subscribe(message -> { System.out.println("Case 1 Observable thread " + message); System.out.println("Case 1 Observer thread " + Thread.currentThread().getName()); }); //will print: //Case 1 Observable thread main //Case 1 Observer thread main
SubscribeOn:
createObservable() .subscribeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 2 Observable thread " + message); System.out.println("Case 2 Observer thread " + Thread.currentThread().getName()); }); //will print: //Case 2 Observable thread RxNewThreadScheduler-1 //Case 2 Observer thread RxNewThreadScheduler-1
SubscribeOn and ObserveOn:
reateObservable() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 3 Observable thread " + message); System.out.println("Case 3 Observer thread " + Thread.currentThread().getName()); }); //will print: //Case 3 Observable thread RxNewThreadScheduler-2 //Case 3 Observer thread RxNewThreadScheduler-1
ObserveOn:
createObservable() .observeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 4 Observable thread " + message); System.out.println("Case 4 Observer thread " + Thread.currentThread().getName()); }); //will print: //Case 4 Observable thread main //Case 4 Observer thread RxNewThreadScheduler-1
Answer:
AndroidSchedulers.mainThread() returns a sheduler which delegates work to MessageQueue associated with main thread.
For this purpose it uses android.os.Looper.getMainLooper() and android.os.Handler.
In other words, if you would like to specify particular thread, you must provide means to schedule and perform tasks on thread.
Underneath it may use any kind of MQ for storing tasks and logic which loops the Quee and execute tasks.
In java, we have Executor which is designated for such tasks. RxJava can easily create Scheduler from such Executor.
Below is example which shows how you can observe on main thread (not particular useful but show all required parts).
public class RunCurrentThread implements Executor { private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); public static void main(String[] args) throws InterruptedException { RunCurrentThread sample = new RunCurrentThread(); sample.observerOnMain(); sample.runLoop(); } private void observerOnMain() { createObservable() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.from(this)) .subscribe(message -> { System.out.println("Observable thread " + message); System.out.println("Observer thread " + Thread.currentThread().getName()); }); ; } public Observable<String> createObservable() { return Observable.create((Subscriber<? super String> subscriber) -> { subscriber.onNext(Thread.currentThread().getName()); subscriber.onCompleted(); } ); } private void runLoop() throws InterruptedException { while(!Thread.interrupted()){ tasks.take().run(); } } @Override public void execute(Runnable command) { tasks.add(command); } }
And the last question, why your code does not terminate:
ThreadPoolExecutor uses non deamon threads by defult, thus your program does not end until they exist. You should use shutdown method to close the threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With