I'm using RxJava to essentially collect the list of individually emitted Observables and combine them into a list of Observables (essentially sort of the opposite of flatMap). Here's my code:
// myEvent.findMemberships() returns an Observable<List<Membership>>
myEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.toList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(List<User> users) {
Timber.d("%d users emitted", users.size());
}
})
I notice that my onNext is never called. I can't seem to understand this. If i remove the .toList call and basically output the individual Users (as shown below) it works by emitting each item.
subscriptions //
.add(currentEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(User user) {
Timber.d("%d users emitted", user.getName());
}
}));
Q1. Is my understanding of .toList incorrect?
Q2. How does one combing a stream of individually emitted Observable<Object>
s into a single Observable<List<Object>>
?
** EDIT
@kjones totally nailed the issue. I was not calling onComplete with my findMemberships call. I've added the code snippet below. My real use case was a little more convoluted with a bunch of more transformations which is why I needed to be using .toList call. As @zsxwing also rightly pointed out, for just this use case, a simple map will suffice.
public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
@Override
public void call(Subscriber<? super List<Membership>> subscriber) {
try {
// .....
List<Membership> memberships = queryMyDb();
subscriber.onNext(memberships);
// BELOW STATEMENT FIXES THE PROBLEM ><
// subscriber.onCompleted();
} catch (SQLException e) {
// ...
}
}
});
}
It looks like the Observable returned by the myEvent.findMemberships()
call is never calling onComplete. Can you show this code?
If that is the case, it would explain the behavior you are seeing. The .toList()
will not emit the list until all items have been emitted (signaled by onComplete).
Your second version without .toList()
, would proceed as follows:
.findMemberships()
emits a single List<Membership>
.flatMap()
transforms List<Membership> into a single List<User>
Observable.from(users) creates an observable that emits each user
.subscribe()
onNext() is called for each user
onCompleted() is never called.
Your original version:
.findMemberships()
emits a single List<Membership>
.flatMap()
transforms List<Membership> into a single List<User>
Observable.from(users) creates an observable that emits each user
.toList()
buffers each User waiting for onCompleted() to be called
onCompleted is never called because the .findMemberships Observable never completes
There are several solutions:
1) Make the findMemberShips()
Observable call onComplete.This may not be desirable if the Observable returned by findMemberShips()
is a Rx Subject (PublishSubject, BehaviorSubject, etc)
2) Use Observable.just()
instead of Observable.from()
. You already have a List<User>
in .flatMap(), just return it. Using Observable.from(users)
creates an Observable that emits each user. Observable.just(users)
would create an Observable that emits a single List<User>
. No need for .toList()
.
3) Use .map()
instead of .flatMap()
. Again, no need for .toList()
. Since each List<Membership>
gets transformed into a List<User>
you only need to use .map()
.
myEvent
.findMemberships()
.map(new Func1<List<Membership>, List<User>>() {
@Override
public List<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return users;
}
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With