Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Schedulers.io creates hundreds of RxCachedThreadSchedulers

I use RxJava in my android app and it ran into OutOfMemoryError several times. I checked it with Device Manager and I just noticed, that I have more, than 200 threads, most of them in wait state and usually those are RxCachedThreadSchedulers. The OOMError is raised becaused of having too many threads. I also noticed, that if I push a button, which invokes a service and get a token and caches it, the thread count grows by 5!

So, I googled and found, that Schedulers.io can create unlimited threads. When I replace every Schedulers.io with Schedulers.computation, the problem goes away, but that makes no sense, since I use the Schedulers.io like it is supposed to be used.

So how can I use Schedulers.io and make sure, that it does not create too many threads?

Update

I do the unsubscribing like this:

    final Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            long last = lastServerCommunication.getMillis();
            LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
            pref.set(last);
            worker.unsubscribe();
        }
    });

Update #2

Regular way I use Schedulers.io are e.g:

public Observable<Scenario> load() {
    return Observable
            .create(new Observable.OnSubscribe<Scenario>() {
                @Override
                public void call(Subscriber<? super Scenario> subscriber) {
                    try {
                        Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
                        subscriber.onNext(scenario);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(new Throwable());
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

And:

    mSomeSubscription = mSomeManager.readFromDatabase()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<List<SomeEntry>>() {
                @Override
                public void onCompleted() { }

                @Override
                public void onError(Throwable e) {
                    // some logging
                }

                @Override
                public void onNext(List<SomeEntry> Entries) {
                    // Some action
                }
            });
like image 492
szidijani Avatar asked Jul 08 '15 13:07

szidijani


People also ask

What's the difference between observeOn() and subscribeOn()?

ObserveOn works only downstream. All the methods following the observeOn have been moved to the IO thread. While the methods prior to the observeOn are still in the main thread. While subscribeOn changes the thread for the calls prior to it and also the methods that follow it.

What are schedulers in RxJava?

Android Scheduler — This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

What is RxJava in Android example?

RxJava is a JVM library that uses observable sequences to perform asynchronous and event-based programming. Its primary building blocks are triple O's, which stand for Operator, Observer, and Observables. And we use them to complete asynchronous tasks in our project. It greatly simplifies multithreading in our project.

How default is used in RxJava?

RxJava 1․ This operator does not by default operate on any particular Scheduler. There is also a new operator in RxJava 1.1 called switchIfEmpty that, rather than emitting a backup value if the source Observable terminates without having emitted any items, it emits the emissions from a backup Observable.


2 Answers

Okay, I found the reason. See the description, Update #2:

return Observable.create(new Observable.OnSubscribe<Something>() {
            @Override
            public void call(Subscriber<? super Something> subscriber) {
                try {
                  // Some action
                    subscriber.onNext(scenario);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(new Throwable());
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

When you create cold Observable sequences like this you have to make sure, that you call onCompleted on the subscribers, see above subscriber.onCompleted();. Well, It wasn't there at some places in the code, so io threads were generated.

Many thanks akarnokd for the help!

like image 112
szidijani Avatar answered Sep 24 '22 10:09

szidijani


If you use Schedulers.io().createWorker() you have to unsubscribe() the Worker once you have finished. Regular RxJava operators shouldn't leak any workers and thus threads.

like image 21
akarnokd Avatar answered Sep 25 '22 10:09

akarnokd