My overall workflow for the Rx calls should work as follows (regardless of the current Rx code):
Room Dao (with the purpose of uploading them to a REST API). I'm using a Single<List<Reading>> for thisreadings list is empty, then perform a jobFinished() callback and execute nothing after thisreadings is not empty, then chain a network call to this Single. The network call returns a CompletableSingle never logically throws an error, since it either fetches an empty or a non-empty readings listjobFinished() callbackreadings from the DaoSingle, but error of the Completable, update the readings in the DaoMy current code is as follows:
Single.create<List<Reading>> {
readings = readingDao.getNextUploadBatch()
if (readings.isEmpty()) {
jobFinished(job, false)
return@create
}
it.onSuccess(readings)
}
.flatMapCompletable { api.uploadSensorReadings(it) }
.doOnTerminate {
jobFinished(job, !readingDao.isEmpty())
}
.subscribeOn(rxSchedulers.network)
.observeOn(rxSchedulers.database)
.subscribe(
{
readingDao.delete(*readings.toTypedArray())
},
{
markCurrentReadingsAsNotUploading()
}
)
The logical problem with the above code is (haven't tested it in runtime, but it compiles) that:
flatMapCompletable if readings list is emptydoOnTerminate to execute if readings is emptyonComplete part (the first {} block) of subscribe to execute unless readings was non-empty, and the Completable returned a success as wellonError part (the second {} block) of subscribe to execute unless readings was non-empty, and the Completable failedI'm not sure how to implement my workflow as an efficient and neat Rx call chain. Any suggestions would be dearly welcome!
If you want to perform something different depending on a value, think of flatMap:
Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
if (readings.isEmpty()) {
jobFinished(job, false);
return Completable.complete();
}
return api.uploadSensorReadings(readings)
.doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
.observeOn(rxSchedulers.database)
.doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With