Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queuing tasks with RxJava in Android

I'm developing application for Android with background data synchronization. I'm currently using RxJava to post some data on server in regular intervals. Other than that, I'd like to provide user with a button "force sync" which will trigger sync immediately. I know how to use Observable.interval() to push data in regular time intervals and I would know how to use Observalbe.just() to push that one which is forced, but I'd like to queue them up if so happens that one is triggered while the previous still runs.

So let's take example when 1min is interval of automatic sync, and let's say that sync lasts 40sec (I'm over exaggerating here just to make easier point). Now if by any chance, user presses the "force" button when the automatic is still running (or vice versa - the automatic triggers when the forced one is still running), I'd like to queue the second sync request to go just as the first one finishes.

I've draw this image which may put some more perspective to it:

enter image description here

As you can see, the automatic is triggered (by some Observable.interval()), and in the middle of syncing, user presses "force" button. Now we want to wait for the first request to finish and then start again for the forced request. At one point, while the forced request was running, the new automatic request was triggered again which just added it to queue. After the last one was finished from the queue everything stops, and then the automatic was scheduled again little later on.

Hope somebody can point me to correct operator how to do this. I've tried with Observable.combineLatest(), but the queue list was dispatched at beginning and when I added new sync to queue it did not continue when the previous operation was completed.

Any help is greatly appreciated, Darko

like image 983
Darko Smoljo Avatar asked Apr 11 '16 10:04

Darko Smoljo


Video Answer


2 Answers

You can do this by merging the timer with the button click Observable/Subject, use the queueing effect of onBackpressureBuffer and concatMap the processing into it which makes sure that runs one at a time.

PublishSubject<Long> subject = PublishSubject.create();

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);

periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Long v) {
        // simulates the task to run
        return Observable.just(1)
                .delay(300, TimeUnit.MILLISECONDS);
    }
}
).subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);

Thread.sleep(800);
like image 180
akarnokd Avatar answered Oct 17 '22 22:10

akarnokd


Although there is an accepted answer with a good solution I would like to share another option to do this using Scheduler and SingleThreadExecutor

public static void main(String[] args) throws Exception {
    System.out.println(" init ");
    Observable<Long> numberObservable =
            Observable.interval(700, TimeUnit.MILLISECONDS).take(10);

    final Subject subject = PublishSubject.create();

    Executor executor = Executors.newSingleThreadExecutor();
    Scheduler scheduler = Schedulers.from(executor);
    numberObservable.observeOn(scheduler).subscribe(subject);

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
                    onCompleteFunc("subscriber 1"));

    Thread.sleep(800);
    //simulate action
    executor.execute(new Runnable() {
        @Override
        public void run() {
            subject.onNext(333l);
        }
    });

    Thread.sleep(5000);
}

static Action1<Long> onNextFunc(final String who) {
    return new Action1<Long>() {
        public void call(Long x) {
            System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
                    + " -- " + System.currentTimeMillis());
            try {
                //simulate some work
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
}

static Action1<Throwable> onErrorFunc(final String who) {
    return new Action1<Throwable>() {
        public void call(Throwable t) {
            t.printStackTrace();
        }
    };
}

static Action0 onCompleteFunc(final String who) {
    return new Action0() {
        public void call() {
            System.out.println(who + " complete");
        }
    };
}
like image 3
kalin Avatar answered Oct 17 '22 23:10

kalin