Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava - upload files sequentially - emit next item, when onNext called

I have a method that uploads concurrently multiple files to the cloud storage. It looks something like this:

List<String> files = Arrays.asList("file0", "file1", "file2");

Observable.from(files)
        .flatMap(file -> uploadFile(file)
                .flatMap(done -> notifyFinished(file)))
        .subscribe(this::onNext, this::onError, this::onCompleted);


private Observable<Boolean> uploadFile(String file) {
    Timber.d("Uploading: " + file);
    return Observable.just(true).delay(6, TimeUnit.SECONDS);
}

private Observable<Boolean> notifyFinished(String file) {
    Timber.d("Notify finished: " + file);
    return Observable.just(true).delay(3, TimeUnit.SECONDS);
}

The output of this is:

06-09 02:10:04.779 D: Uploading: file0
06-09 02:10:04.780 D: Uploading: file1
06-09 02:10:04.781 D: Uploading: file2
06-09 02:10:10.782 D: Notify finished: file1
06-09 02:10:10.782 D: Notify finished: file0
06-09 02:10:10.783 D: Notify finished: file2
06-09 02:10:13.784 D: onNext
06-09 02:10:13.786 D: onNext
06-09 02:10:13.786 D: onNext
06-09 02:10:13.787 D: onCompleted

I want to make it work sequentially, eg:

1) Uploading: file0
2) Notify finished: file0
3) onNext
4) Uploading: file1
5) Notify finished: file1
6) onNext
   ...

Is it possible to do something like this with Rx?

EDIT

Replacing first flatMap with concatMap did the job. I thought that I knew the difference between those operators, but this example just shown that I know nothing... Now the output is:

06-09 02:15:00.581 D: Uploading: file0
06-09 02:15:06.584 D: Notify finished: file0
06-09 02:15:09.586 D: onNext
06-09 02:15:09.587 D: Uploading: file1
06-09 02:15:15.590 D: Notify finished: file1
06-09 02:15:18.593 D: onNext
06-09 02:15:18.595 D: Uploading: file2
06-09 02:15:24.598 D: Notify finished: file2
06-09 02:15:27.599 D: onNext
06-09 02:15:27.601 D: onCompleted
like image 359
rafakob Avatar asked Jun 08 '16 22:06

rafakob


People also ask

When the onNext () method of the subscriber is called?

After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.

What is RxJava3?

x API for RxJava3. RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.

What is an Observable in RxJava?

An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.

What is RxJava example?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.


2 Answers

if you want 'ordered' consecutive sequential, just use concatMap() instead of flatMap()

like image 120
Intae Kim Avatar answered Sep 23 '22 09:09

Intae Kim


Create an observable per file and concat the three observables

@Test
public void testContact() {

    Observable.concat(Observable.just(uploadFile(file1)),
                      Observable.just(uploadFile(file2)),
                      Observable.just(uploadFile(file3)))
              .flatMap(file -> notifyFinished(file)))
              .subscribe(this::onNext, this::onError, this::onCompleted);
}

You will have to make the method notifyFinished return the observable file instead boolean.

You can also use merge or zip, you have more examples of combining observables here https://github.com/politrons/reactive/tree/master/src/test/java/rx/observables/combining

like image 27
paul Avatar answered Sep 26 '22 09:09

paul