Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a sequence of consecutive operations using rxJava?

Tags:

rx-java

I have the download process that consists of 3 consecutive operations: preProcess, downloading, postProcess. Each operation has asynchronous nature (preProcess calls API, downloading waits file to be downloaded etc). UI have to display which operations is executing (eg. "preparing...", "downloading...", "unpacking..."). I see whole process as Observable that emits current status of whole operation. Each operation is also an observable, that emits his status in the start of executions and completes after execution.

    Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.PRE_PROCESS);
            doPreProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(final Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.DOWNLOADING);
            doDownloading()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.POST_PROCESS);
            doPostProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

On the one hand each operation should wait until previous operations completes. On the other hand subscriber need to receive each emitted status (eg. PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)

I cannot use merge because each operation should depend on completion of previous one. I cannot use flatMap because i don't know how to propagate emitted status. I think that Subject could be the solution, but i also don't know how to propagate emitted status.

How can I solve such problem with rxJava? Thank for any ideas/clues.

like image 306
MyDogTom Avatar asked Dec 25 '22 23:12

MyDogTom


1 Answers

concat is what you need. This subscribes to the concatenated observable once the preceding one has finished.

concatMap also works like flatMap but concatenates the flattened projections. There's a nice diagram here on the difference between those two.

like image 197
James World Avatar answered Jan 31 '23 02:01

James World