Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to merge items from flatMapIterable

Tags:

java

rx-java

I have an infinite observable (db observer) that emits finite sequences of items, that need to be processed and then emitted back as a group.

The problem here is, how to write it so that toList doesn't wait for the original source to finish, but for the flatMapIterable generated sequence instead;

DbObservable.map(new Func1<Query, List<Item>>() {
                        @Override
                        public List<Item> call(Query query) {
                            return query.items; //get items from db query
                        }
                    })
                    .flatMapIterable(new Func1<List<Item>, Iterable<Item>>() {
                        @Override
                        public Iterable<Item> call(List<GeoStop> geoStops) {
                            return geoStops;
                        }
                    })
                    .flatMap(/*process*/)
                    .toList() // regenerate List that was passed in to flatMapIterable
                    //subscribe and emit list of items

Nothing reaches subscribers due to toList being stuck waiting for DbObservable's onComplete.

like image 265
mewa Avatar asked Feb 29 '16 22:02

mewa


1 Answers

toList() waits for onCompleted() event but the flatMap(/*process*/) doesn't propagate complete.

So, you need to call them inside a new flatMap()

db.map(q -> q.items)
    .flatMap(items -> Observable.from(items)
        .flatMapIterable(items)
        .flatMap(/*process*/)
        .toList()
    )
    .subscribe(...)
like image 109
André Mion Avatar answered Sep 21 '22 12:09

André Mion