Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Observable emitting values periodically

Tags:

I have to poll some RESTful endpoint periodically to refresh my android app's data. I also have to pause and resume it based on connectivity (if the phone is offline, there's no need to even try). My current solution is working, but it uses standard Java's ScheduledExecutorService to perform periodic tasks, but I'd like to stay in Rx paradigm.

Here's my current code, parts of which are skipped for brevity.

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {     @Override     public void call(final Subscriber<? super UserProfile> subscriber) {         final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();         final Runnable runnable = new Runnable() {             @Override             public void run() {                 // making http request here             }         };         final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);         networkStatusObservable.subscribe(new Action1<Boolean>() {             @Override             public void call(Boolean networkAvailable) {                 if (!networkAvailable) {                     pause();                 } else {                     pause();                                             futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));                 }             }              private void pause() {                 for (ScheduledFuture<?> future : futures) {                     future.cancel(true);                 }                 futures.clear();             }         });          final Subscription subscription = new Subscription() {             private boolean isUnsubscribed = false;              @Override             public void unsubscribe() {                 scheduledExecutorService.shutdownNow();                 isUnsubscribed = true;             }              @Override             public boolean isUnsubscribed() {                 return isUnsubscribed;             }         };         subscriber.add(subscription);     } }).multicast(BehaviorSubject.create()).refCount(); 

networkStatusObservable is basically a broadcast receiver wrapped into Observable<Boolean>, indicating that the phone is connected to the network.

As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfiles, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timer and Observable.interval, but can't figure out how to apply them to this task (and I'm not sure if I need to use those at all).

like image 418
Haspemulator Avatar asked Jul 03 '14 15:07

Haspemulator


1 Answers

There are a few approaches on this GitHub issue that you might find helpful.

https://github.com/ReactiveX/RxJava/issues/448

The three implementations are:


Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()         .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {             public Observable<Notification<AppState>> call(Long seconds) {                 return lyftApi.updateAppState(params).materialize(); } }); 

Scheduler.schedulePeriodically

Observable.create({ observer ->         Schedulers.newThread().schedulePeriodically({             observer.onNext("application-state-from-network");         }, 0, 1000, TimeUnit.MILLISECONDS);     }).take(10).subscribe({ v -> println(v) }); 

Manual Recursion

Observable.create(new OnSubscribeFunc<String>() {         @Override         public Subscription onSubscribe(final Observer<? super String> o) {             return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {                 @Override                 public Subscription call(Scheduler inner, Long t2) {                     o.onNext("data-from-polling");                     return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);                 }             });         }     }).toBlockingObservable().forEach(new Action1<String>() {         @Override         public void call(String v) {             System.out.println("output: " + v);         }     }); 

And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.

like image 102
Robert Estivill Avatar answered Sep 20 '22 15:09

Robert Estivill