I have a method that uploads concurrently multiple files to the cloud storage. It looks something like this:
List<String> files = Arrays.asList("file0", "file1", "file2");
Observable.from(files)
.flatMap(file -> uploadFile(file)
.flatMap(done -> notifyFinished(file)))
.subscribe(this::onNext, this::onError, this::onCompleted);
private Observable<Boolean> uploadFile(String file) {
Timber.d("Uploading: " + file);
return Observable.just(true).delay(6, TimeUnit.SECONDS);
}
private Observable<Boolean> notifyFinished(String file) {
Timber.d("Notify finished: " + file);
return Observable.just(true).delay(3, TimeUnit.SECONDS);
}
The output of this is:
06-09 02:10:04.779 D: Uploading: file0
06-09 02:10:04.780 D: Uploading: file1
06-09 02:10:04.781 D: Uploading: file2
06-09 02:10:10.782 D: Notify finished: file1
06-09 02:10:10.782 D: Notify finished: file0
06-09 02:10:10.783 D: Notify finished: file2
06-09 02:10:13.784 D: onNext
06-09 02:10:13.786 D: onNext
06-09 02:10:13.786 D: onNext
06-09 02:10:13.787 D: onCompleted
I want to make it work sequentially, eg:
1) Uploading: file0
2) Notify finished: file0
3) onNext
4) Uploading: file1
5) Notify finished: file1
6) onNext
...
Is it possible to do something like this with Rx?
EDIT
Replacing first flatMap
with concatMap
did the job. I thought that I knew the difference between those operators, but this example just shown that I know nothing... Now the output is:
06-09 02:15:00.581 D: Uploading: file0
06-09 02:15:06.584 D: Notify finished: file0
06-09 02:15:09.586 D: onNext
06-09 02:15:09.587 D: Uploading: file1
06-09 02:15:15.590 D: Notify finished: file1
06-09 02:15:18.593 D: onNext
06-09 02:15:18.595 D: Uploading: file2
06-09 02:15:24.598 D: Notify finished: file2
06-09 02:15:27.599 D: onNext
06-09 02:15:27.601 D: onCompleted
After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.
x API for RxJava3. RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.
An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.
RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.
if you want 'ordered' consecutive sequential, just use concatMap()
instead of flatMap()
Create an observable per file and concat the three observables
@Test
public void testContact() {
Observable.concat(Observable.just(uploadFile(file1)),
Observable.just(uploadFile(file2)),
Observable.just(uploadFile(file3)))
.flatMap(file -> notifyFinished(file)))
.subscribe(this::onNext, this::onError, this::onCompleted);
}
You will have to make the method notifyFinished return the observable file instead boolean.
You can also use merge or zip, you have more examples of combining observables here https://github.com/politrons/reactive/tree/master/src/test/java/rx/observables/combining
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With