Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting loop with condition into RxJava stream

I have code that does blocking operation in while loop (downloads some data from a server). Client does not know how many items are going to be returned in each step. Loop breaks when N items are downloaded.

val n = 10
val list = ArrayList<T>()

while (list.size < n) {
    val lastItemId = list.last()?.id ?: 0
    val items = downloadItems(lastItemId)
    list.addAll(items)
}

downloadItems performs blocking HTTP call and returns list. Now let's assume downloadItems changes and new return type is Observable<Item>. How could I change the code to use RxJava without performing something like blockingGet?

like image 812
x2bool Avatar asked Oct 30 '25 10:10

x2bool


1 Answers

You could use repeatUntil to achieve this:

var totalItems = 0    
var id = 0
Observable.fromCallable {
            downloadItems(id)
        }
        .flatMap {
            list ->
                totalItems += list.size
                id = list.last()?.id ?: 0
                Observable.just(list)
        }
        .repeatUntil({totalItems > n})
        .subscribe({result -> System.out.println(result) })
like image 57
Anatolii Avatar answered Nov 02 '25 00:11

Anatolii