Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Multithreading with Realm - Realm access from incorrect thread

Background

I am using Realm within my app. When data is loaded it then undergoes intense processing therefore the processing occurs on a background thread.

The coding pattern in use is the Unit of Work pattern and Realm only exists within a repository under a DataManager. The idea here is that each repository can have a different database/file storage solution.

What I have tried

Below is an example of some similar code to what I have in my FooRespository class.

The idea here is that an instance of Realm is obtained, used to query the realm for objects of interest, return them and close the realm instance. Note that this is synchronous and at the end copies the objects from Realm to an unmanaged state.

public Observable<List<Foo>> getFoosById(List<String> fooIds) {

    Realm realm = Realm.getInstance(fooRealmConfiguration);

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);

    for(String id : fooIds) {

        findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
        findFoosByIdQuery.or();
    }

    return findFoosByIdQuery
            .findAll()
            .asObservable()
            .doOnUnsubscribe(realm::close)
            .filter(RealmResults::isLoaded)
            .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}

This code is later used in conjunction with the heavy processing code via RxJava:

dataManager.getFoosById(foo)
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

After reading the docs, my belief is that the above should work, as it is NOT asynchronous and therefore does not need a looper thread. I obtain the instance of realm within the same thread therefore it is not being passed between threads and neither are the objects.

The problem

When the above is executed I get

Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

This doesn't seem right. The only thing I can think of is that the pool of Realm instances is getting me an existing instance created from another process using the main thread.

like image 984
Graham Smith Avatar asked Sep 22 '16 16:09

Graham Smith


2 Answers

Kay so

return findFoosByIdQuery
        .findAll()
        .asObservable()

This happens on UI thread, because that's where you're calling it from initially

.subscribeOn(Schedulers.io())

Aaaaand then you're tinkering with them on Schedulers.io().

Nope, that's not the same thread!

As much as I dislike the approach of copying from a zero-copy database, your current approach is riddled with issues due to misuse of realmResults.asObservable(), so here's a spoiler for what your code should be:

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
   return Observable.defer(() -> {
       try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
           RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
           for(String id : fooIds) {
               findFoosByIdQuery.equalTo(FooFields.ID, id);
               findFoosByIdQuery.or(); // please guarantee this works?
           }
           RealmResults<Foo> results = findFoosByIdQuery.findAll();
           return Observable.just(realm.copyFromRealm(results));
       }
   }).subscribeOn(Schedulers.io());
}
like image 100
EpicPandaForce Avatar answered Nov 14 '22 08:11

EpicPandaForce


Note that you are creating the instance outside of all your RxJava processing pipeline. Thus on the main thread (or whichever thread you are on, when calling getFoosById().

Just because the method returns an Observable doesn't mean that it runs on another thread. Only the processing pipeline of the Observable created by the last statement of your getFoosById() method runs on the correct thread (the filter(), the flatMap() and all the processing done by the caller).

You thus have to ensure that the call of getFoosById()is already done on the thread used by Schedulers.io().

One way to achieve this is by using Observable.defer():

Observable.defer(() -> dataManager.getFoosById(foo))
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);
like image 44
Wolfram Rittmeyer Avatar answered Nov 14 '22 08:11

Wolfram Rittmeyer