I'm using Retrofit to get bookmarks from REST API:
public interface BookmarkService {
@GET("/bookmarks")
Observable<List<Bookmark>> bookmarks();
}
Now I would like to emit each item from this list with delay.
I did something similar to this in Java, but onCompleted
is never triggered.
private Observable<Bookmark> getBookmarks() {
return getBookmarkService().bookmarks()
.flatMap(new Func1<List<Bookmark>, Observable<Bookmark>>() {
@Override
public Observable<Bookmark> call(List<Bookmark> bookmarks) {
Observable<Bookmark> resultObservable = Observable.never();
for (int i = 0; i < bookmarks.size(); i++) {
List<Bookmark> chunk = bookmarks.subList(i, (i + 1));
resultObservable = resultObservable.mergeWith(Observable.from(chunk).delay(1000 * i, TimeUnit.MILLISECONDS));
}
return resultObservable;
}
})
.observeOn(AndroidSchedulers.mainThread());
}
What I'm doing wrong?
Usage:
mSwipeRefreshLayout.setRefreshing(true);
getBookmarks()
.subscribe(new Observer<Bookmark>() {
@Override
public void onCompleted() {
Timber.i("Completed");
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onError(Throwable e) {
Timber.i("Error: %s", e.toString());
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(Bookmark bookmark) {
Timber.i("Bookmark: %s", bookmark.toString());
mBookmarksAdapter.addItem(bookmark);
}
});
Geek Tip #2: When we have a use-case where we wish to execute the work first and then postpone the emission for a specific amount of time, we may use the Delay Operator. Let’s take a look at the RxJava Interval Operator, which generates an Observable that emits a sequence of integers separated by a time interval.
Let's start with the Timer Operator of RxJava. Timer operator is used when we want to do something after a span of time that we specify. Let's understand Timer operator with an example. Here as we have passed 2 seconds into the Timer operator, it will go into the flatMap operator after 2 seconds.
We can understand RxJava as data emitted by one component, called Observable, and the underlying structure provided by the Rx libraries will propagate changes to another component, Observer. Simply put, it’s an API for asynchronous programming with observable streams.
RxJava — Multi-Threading in Android helps to understand the basics of Rx, everything about Observable s, Observer s, Scheduler s, etc. So, hoping that you already know about basics of RxJava lets start by discussing Observable. What is an Observable? In RxJava, Observable s are the source that emits data to the Observers.
As you use a merge operation, onCompleted will be call if all Observables are completed. but Observable.never()
will never complete. Use Observable.empty()
instead.
According to your code, your want to emit sublist with delay. The sublist contains only one element
What you can do : flatmap your list, to emit each items. Buffer it to build a list from items, then use a delay.
private Observable<Bookmark> getBookmarks() {
return getBookmarkService().bookmarks()
.flatMap((bookmarks) -> Observable.from(bookmarks)
.buffer(1)
.scan(new Pair(0, null), (ac, value) -> new Pair(acu.index + 1, value)
.flatMap(pair -> Observable.just(pair.value).delay(pair.index, SECONDS))
.observeOn(AndroidSchedulers.mainThread());
}
it might work (not tested)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With