Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conditionally Chain Single and Completable

My overall workflow for the Rx calls should work as follows (regardless of the current Rx code):

  • Get a list of motion sensor readings from a Room Dao (with the purpose of uploading them to a REST API). I'm using a Single<List<Reading>> for this
  • If that readings list is empty, then perform a jobFinished() callback and execute nothing after this
  • If readings is not empty, then chain a network call to this Single. The network call returns a Completable
  • The Single never logically throws an error, since it either fetches an empty or a non-empty readings list
  • When the entire Rx call chain is terminated, perform the jobFinished() callback
  • On the success of the entire Rx call chain, delete those readings from the Dao
  • On success of the Single, but error of the Completable, update the readings in the Dao

My current code is as follows:

  Single.create<List<Reading>> {
        readings = readingDao.getNextUploadBatch()

        if (readings.isEmpty()) {
            jobFinished(job, false)
            return@create
        }

        it.onSuccess(readings)
    }
            .flatMapCompletable { api.uploadSensorReadings(it) }
            .doOnTerminate {
                jobFinished(job, !readingDao.isEmpty())
            }
            .subscribeOn(rxSchedulers.network)
            .observeOn(rxSchedulers.database)
            .subscribe(
                    {
                        readingDao.delete(*readings.toTypedArray())
                    },
                    {
                        markCurrentReadingsAsNotUploading()
                    }
            )



The logical problem with the above code is (haven't tested it in runtime, but it compiles) that:

  • I want to cut off the code starting from the flatMapCompletable if readings list is empty
  • I do not want doOnTerminate to execute if readings is empty
  • I do not want the onComplete part (the first {} block) of subscribe to execute unless readings was non-empty, and the Completable returned a success as well
  • I do not want the onError part (the second {} block) of subscribe to execute unless readings was non-empty, and the Completable failed

I'm not sure how to implement my workflow as an efficient and neat Rx call chain. Any suggestions would be dearly welcome!

like image 493
Daksh Avatar asked Mar 02 '26 08:03

Daksh


1 Answers

If you want to perform something different depending on a value, think of flatMap:

Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
    if (readings.isEmpty()) {
        jobFinished(job, false);
        return Completable.complete();
    }
    return api.uploadSensorReadings(readings)
           .doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
           .observeOn(rxSchedulers.database)
           .doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
like image 96
akarnokd Avatar answered Mar 05 '26 01:03

akarnokd



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!