I use RxJava in my android app and it ran into OutOfMemoryError several times. I checked it with Device Manager and I just noticed, that I have more, than 200 threads, most of them in wait state and usually those are RxCachedThreadSchedulers. The OOMError is raised becaused of having too many threads. I also noticed, that if I push a button, which invokes a service and get a token and caches it, the thread count grows by 5!
So, I googled and found, that Schedulers.io can create unlimited threads. When I replace every Schedulers.io with Schedulers.computation, the problem goes away, but that makes no sense, since I use the Schedulers.io like it is supposed to be used.
So how can I use Schedulers.io and make sure, that it does not create too many threads?
Update
I do the unsubscribing like this:
final Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
long last = lastServerCommunication.getMillis();
LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
pref.set(last);
worker.unsubscribe();
}
});
Update #2
Regular way I use Schedulers.io are e.g:
public Observable<Scenario> load() {
return Observable
.create(new Observable.OnSubscribe<Scenario>() {
@Override
public void call(Subscriber<? super Scenario> subscriber) {
try {
Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
subscriber.onNext(scenario);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(new Throwable());
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
And:
mSomeSubscription = mSomeManager.readFromDatabase()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<List<SomeEntry>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
// some logging
}
@Override
public void onNext(List<SomeEntry> Entries) {
// Some action
}
});
ObserveOn works only downstream. All the methods following the observeOn have been moved to the IO thread. While the methods prior to the observeOn are still in the main thread. While subscribeOn changes the thread for the calls prior to it and also the methods that follow it.
Android Scheduler — This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.
RxJava is a JVM library that uses observable sequences to perform asynchronous and event-based programming. Its primary building blocks are triple O's, which stand for Operator, Observer, and Observables. And we use them to complete asynchronous tasks in our project. It greatly simplifies multithreading in our project.
RxJava 1․ This operator does not by default operate on any particular Scheduler. There is also a new operator in RxJava 1.1 called switchIfEmpty that, rather than emitting a backup value if the source Observable terminates without having emitted any items, it emits the emissions from a backup Observable.
Okay, I found the reason. See the description, Update #2:
return Observable.create(new Observable.OnSubscribe<Something>() {
@Override
public void call(Subscriber<? super Something> subscriber) {
try {
// Some action
subscriber.onNext(scenario);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(new Throwable());
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
When you create cold Observable sequences like this you have to make sure, that you call onCompleted on the subscribers, see above subscriber.onCompleted();
.
Well, It wasn't there at some places in the code, so io threads were generated.
Many thanks akarnokd for the help!
If you use Schedulers.io().createWorker()
you have to unsubscribe()
the Worker
once you have finished. Regular RxJava operators shouldn't leak any workers and thus threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With