Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What thread is unSubscribeOn called? Should we call it?

What thread is unsubscribeOn defaulted to when we do not specify it but still specify the thread for subscribeOn? Are we required to specify the thread that we want un-subscription to happen on even when it is the same thread as the one used in subscribeOn?

Or are the bottom two snippets doing the same for un-subscription?

Option 1:

mSubscription.add(myAppProvider
        .getSomeData()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .unsubscribeOn(Schedulers.io())
        .subscribe(data -> handleData(data),
                throwable -> handleError(throwable)
                ));

Option 2:

mSubscription.add(myAppProvider
        .getSomeData()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> handleData(data),
                throwable -> handleError(throwable)
                ));

I did look at the Rx-Java docs but they only explain subscribeOn but nothing about unSubscribeOn

like image 514
achie Avatar asked Nov 08 '16 21:11

achie


2 Answers

Those snippets have different behaviours.

In general, unsubscription travels up the operator sequence and can be initiated by any thread at all (just call subscriber.unsubscribe() from any thread). Without the presence of an unsubscribeOn operator the unsubscribe action will probably complete its action on the thread it was called from. unsubscribeOn provides finer control of the thread used for unsubscription upstream of it.

like image 143
Dave Moten Avatar answered Oct 16 '22 04:10

Dave Moten


Without subscribeOn (and without observeOn), your unsubscription actions will occur on whatever thread the subscription started.

With subscribeOn, your unsubscription actions will occur on the Scheduler specified by subscribeOn.

With observeOn, your unsubscription actions will occur on the Scheduler specified by observeOn (overriding the Scheduler specified by subscribeOn).

Here is a sample. As suggested there, this is useful when the unsubscription itself involves long-running operations that you want to run on some other thread.

If you run their test code:

Observable<Object> source = Observable.using(
    () -> {
        System.out.println("Subscribed on " + Thread.currentThread().getId());
        return Arrays.asList(1,2);
    },
    (ints) -> {
        System.out.println("Producing on " + Thread.currentThread().getId());
        return Observable.from(ints);
    },
    (ints) -> {
        System.out.println("Unubscribed on " + Thread.currentThread().getId());
    }
);

source
    .unsubscribeOn(Schedulers.newThread())
    .subscribe(System.out::println);

You should see their expected output:

 Subscribed on 1
 Producing on 1
 1
 2
 Unubscribed on 11

If you remove that unsubscribeOn line, you'll see:

 Unsubscribed on 1
like image 4
drhr Avatar answered Oct 16 '22 06:10

drhr