Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using concatMap to run a Single before running another observable

Android Studio 3.1 RC 2
kotlin 1.2.30

The signature of the fetchMessage in Java

Single<Response> fetchMessage(final String Id); 

The kotlin code

fun translate(Id: String): Completable {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toCompletable()
    }

The method I want to run before fetchMessage

 fun insertMessage(Id: String): Completable {
        return Completable.fromCallable {
            insert(Id, State.IDLE)
        }
    }

I want the insertMessage e to somehow run before the fetchMessage. I was thinking of using concatMap but not sure how to combine the translate and insertMessage. So that the insertMessage will run first then once completed the translate will run.

Many thanks for any suggestions,

Update solution 1 using startWith(..):

By changing the return to Single for the translate method. I have done like this:

fun translate(Id: String): Single<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
    }

Then I can have a method to do the following insertMessage(..) -> translate(..):

translate(Id).toCompletable().startWith(insertMessage(id, State.IDLE))

Would that be an ideal solution?

Update solution 2 using concatWith(..):

My returning a Observable and calling toObservable() in the chain.

fun translate(Id: String): Observable<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toObservable()
    }

And I can use concatWith so the sequence would be insertMessage(..) -> translate(..):

translate(Id).toCompletable().concatWith(insertMessage(id, State.IDLE).toObservable())
    .toCompletable()

Are these correct solutions?

like image 607
ant2009 Avatar asked Mar 19 '18 16:03

ant2009


2 Answers

If you have a Completable, you can chain any other reactive type with it via andThen:

insertMessage("id")
.andThen(translate("id"))
like image 128
akarnokd Avatar answered Oct 10 '22 14:10

akarnokd


Both your options make sense, but I would recommend you to clean them a bit.

First of all you need to clearly understand what return type to use in each case: Observable, Single or Completable.

The definition is the following:

  • Single represent Observable that emit single value or error.
  • Completable represent Observable that emits no value, but only terminal events, either onError or onCompleted.

In both your cases you don't need any data returned, all you need is to know if the operation is successful or not. Completable is designed to handle exactly this case.

So I'd recommend you to have:

fun translate(Id: String): Completable {
    return repository.fetchMessage(Id)
             .flatMapCompletable {
                Completable.fromAction {
                  update(messageId, it, State.COMPLETED)
                }
             }.doOnError {
                update(Id, null, State.ERROR)
             }
}

fun insertMessage(Id: String): Completable {
    return Completable.fromCallable {
        insert(Id, State.IDLE)
    }
}

Nice option to make your code cleaner is to use Completable.fromAction instead of Completable.fromCallable, so you don't need to return anything.

Then you can use any of your options, startWith or concatWith. Both wait until first observable completes before running the second observable. I prefer to use concatWith, because it runs the functions in the same order that they are written.

Eventually we get an elegant solution:

insertMessage(id).concatWith(translate(id))

or

translate(id).startWith(insertMessage(id))

More info about concat: http://reactivex.io/documentation/operators/concat.html

Here is the implementation of the functions inside the rxJava library if you are curious:

public final Completable startWith(Completable other) {
    requireNonNull(other);
    return concat(other, this);
}

public final Completable concatWith(Completable other) {
    requireNonNull(other);
    return concat(this, other);
}

As you can see the only difference is the order.

like image 41
TpoM6oH Avatar answered Oct 10 '22 15:10

TpoM6oH