Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Collecting Observables to a List doesn't seem to emit the collection at once

Tags:

java

frp

rx-java

I'm using RxJava to essentially collect the list of individually emitted Observables and combine them into a list of Observables (essentially sort of the opposite of flatMap). Here's my code:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })

I notice that my onNext is never called. I can't seem to understand this. If i remove the .toList call and basically output the individual Users (as shown below) it works by emitting each item.

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));

Q1. Is my understanding of .toList incorrect?

Q2. How does one combing a stream of individually emitted Observable<Object>s into a single Observable<List<Object>> ?

** EDIT

@kjones totally nailed the issue. I was not calling onComplete with my findMemberships call. I've added the code snippet below. My real use case was a little more convoluted with a bunch of more transformations which is why I needed to be using .toList call. As @zsxwing also rightly pointed out, for just this use case, a simple map will suffice.

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});

}

like image 995
Kaushik Gopal Avatar asked Oct 28 '14 02:10

Kaushik Gopal


1 Answers

It looks like the Observable returned by the myEvent.findMemberships() call is never calling onComplete. Can you show this code?

If that is the case, it would explain the behavior you are seeing. The .toList() will not emit the list until all items have been emitted (signaled by onComplete).

Your second version without .toList(), would proceed as follows:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.subscribe()
    onNext() is called for each user
    onCompleted() is never called.

Your original version:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.toList()
    buffers each User waiting for onCompleted() to be called
    onCompleted is never called because the .findMemberships Observable never completes

There are several solutions:

1) Make the findMemberShips() Observable call onComplete.This may not be desirable if the Observable returned by findMemberShips() is a Rx Subject (PublishSubject, BehaviorSubject, etc)

2) Use Observable.just() instead of Observable.from(). You already have a List<User> in .flatMap(), just return it. Using Observable.from(users) creates an Observable that emits each user. Observable.just(users) would create an Observable that emits a single List<User>. No need for .toList().

3) Use .map() instead of .flatMap(). Again, no need for .toList(). Since each List<Membership> gets transformed into a List<User> you only need to use .map().

myEvent
    .findMemberships()
    .map(new Func1<List<Membership>, List<User>>() {
        @Override
        public List<User> call(List<Membership> memberships) {
            List<User> users = new ArrayList<User>();
            for (Membership membership : memberships) {
                users.add(membership.getUser());
            }
            return users;
         }
    })
like image 134
kjones Avatar answered Nov 13 '22 09:11

kjones