Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Applying schedulers twice in an observable chain (using compose)

I have multiple network calls in my app. I like to run a network request in the IO thread by using the compose operator with this transformer:

public static <T> Transformer<T, T> runOnIoThread()
{
    return tObservable -> tObservable.subscribeOn( Schedulers.io() )
        .observeOn( AndroidSchedulers.mainThread() );
}

This seems to work well as long as I only have single network calls. However if I chain them as in the following example I'm getting Android's NetworkInMainThreadException.

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .compose( runOnIoThread() );
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .compose( runOnIoThread() );
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) );
}

My idea before was that compose is applied to the complete observable chain before the call, and that later compose calls would "overwrite" the behavior of a previous one. But in fact, it looks like the observeOn call of the first compose (observeOn main thread) dominates the second compose call (subscribeOn the IO thread). One obvious solution would be to have two versions of networkCall1- one which applies schedulers and another which doesn't. However, this would make my code quite verbose.

Do you know better solutions? Can you explain the behavior of applying schedulers twice (with compose) in an observable chain?

Edit: I'm using retrofit with RxJava for my network calls.

like image 656
Peter F Avatar asked Apr 25 '16 13:04

Peter F


1 Answers

You can only use subscribeOn() once per stream. If you use it a second time it won't do anything. As such when you're chaining your two methods together you run:

observeOn(AndroidSchedulers.mainThread())

which switches the operation over to the main thread. After that it then stays there because the next subscribeOn() is effectively being ignored.

I would suggest that you are actually overcomplicating things with your compose method. Just add

subscribeOn(Schedulers.io())

To both your network calls and then use

observeOn(AndroidSchedulers.mainThread())

Just before you want to process results on the main thread. You'd end up with something like:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io);
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io);
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}

EDIT

If you really want to have an observeOn() call on the individual network call methods you can. You would have to add an extra observeOn() to your chainedCalls() method. You can have as many observeOn() calls as you like per stream. It would be something like:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .observeOn(Schedulers.io)
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}
like image 115
Jahnold Avatar answered Sep 27 '22 19:09

Jahnold