I am using RxJava in my Android app and I want to load data from the database.
In this way, I am creating a new Observable using Observable.create()
which returns a list of EventLog
public Observable<List<EventLog>> loadEventLogs() {
return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
@Override
public void call(Subscriber<? super List<EventLog>> subscriber) {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int i = 0; i < logs.size(); i++) {
eventLogs.add(new EventLog(logs.get(i)));
}
subscriber.onNext(eventLogs);
}
});
}
Though it works correctly, I read that using Observable.create()
is not actually a best practice for Rx Java (see here).
So I changed this method in this way.
public Observable<List<EventLog>> loadEventLogs() {
return Observable.fromCallable(new Func0<List<EventLog>>() {
@Override
public List<EventLog> call() {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int i = 0; i < logs.size(); i++) {
eventLogs.add(new EventLog(logs.get(i)));
}
return eventLogs;
}
});
}
Is this a better approach using Rx Java? Why? What is actually the difference among the two methods?
Moreover, since the database load a list of elements, makes sense to emit the entire list at once? Or should I emit one item at a time?
The two methods may look like similar and behave similar but fromCallable
deals with the difficulties of backpressure for you whereas the create
version does not. Dealing with backpressure inside an OnSubscribe
implementation ranges from simple to outright mind-melting; however, if omitted, you may get MissingBackpressureException
s along asynchronous boundaries (such as observeOn
) or even on continuation boundaries (such as concat
).
RxJava tries to offer proper backpressure support for as much factories and operators as possible, however, there are quite a few factories and operators that can't support it.
The second problem with manual OnSubscribe
implementation is the lack of cancellation support, especially if you generate a lot of onNext
calls. Many of these can be replaced by standard factory methods (such as from
) or helper classes (such as SyncOnSubscribe
) that deal with all the complexity for you.
You may find a lot of introductions and examples that (still) use create
for two reasons.
create
proportionally instead of talking about the standard factory methods and showing how certain common tasks (such as yours) can be achieved safely.For the second question, namely if one should create a List or a sequence of values, it depends on your source. If your source supports some kind of iteration or streaming of individual data elements (such as JDBC), you can just hook onto it and emit one by one (see SyncOnSubscribe
). If it doesn't support it or you need it in List form anyway, then keep it as it is. You can always convert between the two forms via toList
and flatMapIterable
if necessary.
As it's explain in the response you linked, with Observable.create
you'll may need to violate the advanced requirements of RxJava.
For example, you'll need to implement backpressure, or how to unsubscribe.
In you case, you want to emit an item, without having to deal with backpressure or subscription. So Observable.fromCallable
is a good call. RxJava will deal with the rest.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With