I have an infinite observable (db observer) that emits finite sequences of items, that need to be processed and then emitted back as a group.
The problem here is, how to write it so that toList
doesn't wait for the original source to finish, but for the flatMapIterable generated sequence instead;
DbObservable.map(new Func1<Query, List<Item>>() {
@Override
public List<Item> call(Query query) {
return query.items; //get items from db query
}
})
.flatMapIterable(new Func1<List<Item>, Iterable<Item>>() {
@Override
public Iterable<Item> call(List<GeoStop> geoStops) {
return geoStops;
}
})
.flatMap(/*process*/)
.toList() // regenerate List that was passed in to flatMapIterable
//subscribe and emit list of items
Nothing reaches subscribers due to toList
being stuck waiting for DbObservable
's onComplete
.
toList()
waits for onCompleted()
event but the flatMap(/*process*/)
doesn't propagate complete.
So, you need to call them inside a new flatMap()
db.map(q -> q.items)
.flatMap(items -> Observable.from(items)
.flatMapIterable(items)
.flatMap(/*process*/)
.toList()
)
.subscribe(...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With