When streaming copy of realm objects instead of realm reference and observing it on Schedulers.IO thread, there is a crash with the famous exception message "Realm access from incorrect thread. Realm objects can only be accessed in the thread they were created."
Shouldn't the copy be thread free? Can I produce it from one thread and process it on a different thread?
This is how I am creating observable.
public Observable<Brand> getAllBrands() {
return realm.where(Brand.class)
.findAll()
.asObservable()
.flatMap(Observable::from)
.map(brand -> realm.copyFromRealm(brand));
}
Following is how I observe getAllBrands().
Observable<Brand> brandObservable = dataManager.getAllBrands();
brandObservable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Brand>() {
@Override
public void onCompleted() {
Log.d("reactive", "completed");
}
@Override
public void onError(Throwable e) {
Log.d("reactive", e.getMessage());
}
@Override
public void onNext(Brand brand) {
dataSource.add(brand.getName());
myAdapter.notifyDataSetChanged();
}
});
You are subscribing on schedulers.io
while using the Realm instance from the UI thread:
realm.where(Brand.class)
.findAll()
.asObservable()
.flatMap(Observable::from)
.map(brand -> realm.copyFromRealm(brand)) // realm instance on the wrong thread
.subscribeOn(schedulers.io());
What are you after is an easy way to move a query across threads, which is still work-in-progress here: https://github.com/realm/realm-java/pull/1978. Until then you can work around it by doing it yourself like this:
public Observable<Brand> getAllBrands(final Realm realm) {
return Observable.create(new Observable.OnSubscribe<List<Brand>>() {
@Override
public void call(final Subscriber<? super List<Brand>> subscriber) {
Realm obsRealm = Realm.getInstance(realm.getConfiguration());
final RealmResults<Brand> results = obsRealm.where(Brand.class).findAll();
final RealmChangeListener listener = new RealmChangeListener() {
@Override
public void onChange() {
subscriber.onNext(realm.copyFromRealm(results));
}
};
results.addChangeListener(listener);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
realm.removeChangeListener(listener);
realm.close();
}
}));
}
})
.flatMap(Observable::from);
}
Note that Realm Changelisteners only work on Looper threads which means you probably need to change your worker thread to a H
HandlerThread bgThread = new HandlerThread("workerThread");
Handler handler = new Handler(bgThread.getLooper());
getAllBrands(realm).subscribeOn(HandlerScheduler.from(handler));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With