Background
I am using Realm within my app. When data is loaded it then undergoes intense processing therefore the processing occurs on a background thread.
The coding pattern in use is the Unit of Work pattern and Realm only exists within a repository under a DataManager. The idea here is that each repository can have a different database/file storage solution.
What I have tried
Below is an example of some similar code to what I have in my FooRespository class.
The idea here is that an instance of Realm is obtained, used to query the realm for objects of interest, return them and close the realm instance. Note that this is synchronous and at the end copies the objects from Realm to an unmanaged state.
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
Realm realm = Realm.getInstance(fooRealmConfiguration);
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
findFoosByIdQuery.or();
}
return findFoosByIdQuery
.findAll()
.asObservable()
.doOnUnsubscribe(realm::close)
.filter(RealmResults::isLoaded)
.flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}
This code is later used in conjunction with the heavy processing code via RxJava:
dataManager.getFoosById(foo)
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
After reading the docs, my belief is that the above should work, as it is NOT asynchronous and therefore does not need a looper thread. I obtain the instance of realm within the same thread therefore it is not being passed between threads and neither are the objects.
The problem
When the above is executed I get
Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
This doesn't seem right. The only thing I can think of is that the pool of Realm instances is getting me an existing instance created from another process using the main thread.
Kay so
return findFoosByIdQuery
.findAll()
.asObservable()
This happens on UI thread, because that's where you're calling it from initially
.subscribeOn(Schedulers.io())
Aaaaand then you're tinkering with them on Schedulers.io()
.
Nope, that's not the same thread!
As much as I dislike the approach of copying from a zero-copy database, your current approach is riddled with issues due to misuse of realmResults.asObservable()
, so here's a spoiler for what your code should be:
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
return Observable.defer(() -> {
try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(FooFields.ID, id);
findFoosByIdQuery.or(); // please guarantee this works?
}
RealmResults<Foo> results = findFoosByIdQuery.findAll();
return Observable.just(realm.copyFromRealm(results));
}
}).subscribeOn(Schedulers.io());
}
Note that you are creating the instance outside of all your RxJava processing pipeline. Thus on the main thread (or whichever thread you are on, when calling getFoosById()
.
Just because the method returns an Observable doesn't mean that it runs on another thread. Only the processing pipeline of the Observable created by the last statement of your getFoosById()
method runs on the correct thread (the filter()
, the flatMap()
and all the processing done by the caller).
You thus have to ensure that the call of getFoosById()
is already done on the thread used by Schedulers.io()
.
One way to achieve this is by using Observable.defer()
:
Observable.defer(() -> dataManager.getFoosById(foo))
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With