Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to chain two Completable in RxJava2

I have two Completable. I would like to do following scenario: If first Completable gets to onComplete , continue with second Completable. The final results would be onComplete of second Completable.

This is how I do it when I have Single getUserIdAlreadySavedInDevice() and Completable login():

@Override public Completable loginUserThatIsAlreadySavedInDevice(String password) {     return getUserIdAlreadySavedInDevice()             .flatMapCompletable(s -> login(password, s))  } 
like image 368
Mladen Rakonjac Avatar asked Mar 08 '17 20:03

Mladen Rakonjac


People also ask

What is flatMapCompletable?

This is what flatMapCompletable does: Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.

What is the difference between Observable and single?

A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.

What is single RxJava?

Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response. Sample Implementation: The below code always emits a Single user object. We use a Single Observable and a Single Observer.


2 Answers

You are looking for andThen operator.

Returns a Completable that first runs this Completable and then the other completable.

firstCompletable     .andThen(secondCompletable) 

In general, this operator is a "replacement" for a flatMap on Completable:

Completable       andThen(CompletableSource next) <T> Maybe<T>      andThen(MaybeSource<T> next) <T> Observable<T> andThen(ObservableSource<T> next) <T> Flowable<T>   andThen(Publisher<T> next) <T> Single<T>     andThen(SingleSource<T> next) 
like image 73
Maksim Ostrovidov Avatar answered Oct 02 '22 07:10

Maksim Ostrovidov


TL;DR: the other answers miss a subtlety. Use doThingA().andThen(doThingB()) if you want the equivalent of concat, use doThingA().andThen(Completable.defer(() -> doThingB()) if you want the equivalent of flatMap.


EDIT: a more complete reference

  • flatMap() is the mapping version of merge()
  • concatMap() is the mapping version of concat()
  • For a Completable you need defer() to make function calls lazy like in the mapping functions for Single or Observable (or preferably make it so that nothing happens until you hit subscribe - this is a good convention to follow and is used in the official Rx libraries as well as any Rx extensions I've encountered, for advanced users this refers to cold completables only but most people can ignore that).
  • the only difference between concat(a, b) and a.andThen(b) is syntax

Some examples:

  • foo(a).andThen(bar(b)) will:

    1. call foo(a)
    2. immediately call bar(b) even if the completable returned by step 1 returns an error
    3. subscribe to whatever step 1 returns
    4. Will then subscribe to the result of bar(b) only if the last step completed successfully
  • foo(a).andThen(Completable.defer(() -> bar(b)) will:

    1. call foo(a)
    2. subscribe to the result of step 1
    3. only if the completable returned by by foo(a) succeeds then calls bar(b)

I'm going to leave out the treatment of merge() since it gets a bit more complicated, but long story short that's the one to call if you want "parallelism".


The above answers are sort of correct, but I found them misleading because they miss a subtlety about eager evaluation.

doThingA().andThen(doThingB()) will call doThingB() immediately but only subscribe to the observable returned by doThingB() when the observable returned by doThingA() completes.

doThingA().andThen(Completable.defer(() -> doThingB()) will call doThingB() only after thing A has completed.

This is important only if doThingB() has side effects before a subscribe event. E.g. Single.just(sideEffect(1)).toCompletable()

An implementation that doesn't have side effects before the subscribe event (a true cold observable) might be Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable().

In the case that's just bitten me thing A is some validation logic and doThingB() kicks off an asynchronous database update immediately that completes a VertX ObservableFuture. This is bad. Arguably doThingB() should be written to only update the database upon subscribe, and I'm going to try and design things that way in the future.

like image 34
Sparky Avatar answered Oct 02 '22 07:10

Sparky