I have to poll some RESTful endpoint periodically to refresh my android app's data. I also have to pause and resume it based on connectivity (if the phone is offline, there's no need to even try). My current solution is working, but it uses standard Java's ScheduledExecutorService
to perform periodic tasks, but I'd like to stay in Rx paradigm.
Here's my current code, parts of which are skipped for brevity.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() { @Override public void call(final Subscriber<? super UserProfile> subscriber) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final Runnable runnable = new Runnable() { @Override public void run() { // making http request here } }; final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1); networkStatusObservable.subscribe(new Action1<Boolean>() { @Override public void call(Boolean networkAvailable) { if (!networkAvailable) { pause(); } else { pause(); futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS)); } } private void pause() { for (ScheduledFuture<?> future : futures) { future.cancel(true); } futures.clear(); } }); final Subscription subscription = new Subscription() { private boolean isUnsubscribed = false; @Override public void unsubscribe() { scheduledExecutorService.shutdownNow(); isUnsubscribed = true; } @Override public boolean isUnsubscribed() { return isUnsubscribed; } }; subscriber.add(subscription); } }).multicast(BehaviorSubject.create()).refCount();
networkStatusObservable
is basically a broadcast receiver wrapped into Observable<Boolean>
, indicating that the phone is connected to the network.
As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfile
s, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timer
and Observable.interval
, but can't figure out how to apply them to this task (and I'm not sure if I need to use those at all).
There are a few approaches on this GitHub issue that you might find helpful.
https://github.com/ReactiveX/RxJava/issues/448
The three implementations are:
Observable.interval
Observable.interval(delay, TimeUnit.SECONDS).timeInterval() .flatMap(new Func1<Long, Observable<Notification<AppState>>>() { public Observable<Notification<AppState>> call(Long seconds) { return lyftApi.updateAppState(params).materialize(); } });
Scheduler.schedulePeriodically
Observable.create({ observer -> Schedulers.newThread().schedulePeriodically({ observer.onNext("application-state-from-network"); }, 0, 1000, TimeUnit.MILLISECONDS); }).take(10).subscribe({ v -> println(v) });
Manual Recursion
Observable.create(new OnSubscribeFunc<String>() { @Override public Subscription onSubscribe(final Observer<? super String> o) { return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() { @Override public Subscription call(Scheduler inner, Long t2) { o.onNext("data-from-polling"); return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS); } }); } }).toBlockingObservable().forEach(new Action1<String>() { @Override public void call(String v) { System.out.println("output: " + v); } });
And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With