Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Equivalent of RxJava .toList() in Kotlin coroutines flow

I have a situation where I need to observe userIds then use those userIds to observe users. Either userIds or users could change at any time and I want to keep the emitted users up to date. Here is an example of the sources of data I have:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

In this scenario I want the emissions to be:

[User(abc_name), User(def_name)], then

[User(123_name), User(234_name)], then

[User(123_name_updated), User(234_name_updated)]

I think I can achieve this in RxJava like this:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

What function would I write to make a flow that emits that?

like image 223
Carson Holzheimer Avatar asked Mar 09 '20 14:03

Carson Holzheimer


People also ask

Can coroutines replace RxJava?

Coroutines will eventually be as mature as RxJava, but RxJava is never going to be able to integrate some of coroutines' best features. For our part, Trello Android is going to start slowly adopting coroutines as a replacement for RxJava.

How Kotlin coroutines are better than RxJava?

The reason is coroutines makes it easier to write async code and operators just feels more natural to use. As a bonus, Flow operators are all kotlin Extension Functions, which means either you, or libraries, can easily add operators and they will not feel weird to use (in RxJava observable.

What is flow in Kotlin coroutines?

In coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a flow to receive live updates from a database. Flows are built on top of coroutines and can provide multiple values.

What is terminal operator in Kotlin?

Terminal operators are the operators that actually start the flow by connecting the flow builder, operators with the collector. For example: (1.. 5).asFlow() .filter { it % 2 == 0 } .map { it * it }.collect { Log.d(TAG, it.toString()) } Here, collect is the Terminal Operator.


2 Answers

I believe you're looking for combine, which gives you an array that you can easily call toList() on:

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}

And here's the inner part with more explicit parameter names since you can't see the IDE's type hinting on Stack Overflow:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}

Output:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo (note that in the demo, all the output appears at once, but this is a limitation of the demo site - the lines appear with the timing you'd expect when the code is run locally)

This avoids the (abc_name_updated, def_name_updated) discussed in the original question. However, there's still an intermediate emission with 123_name_updated and 234_name because the 123_name_updated is emitted first and it sends the combined version immediately because they're the latest from each flow.

However, this can be avoided by debouncing the emissions (on my machine, a timeout as small as 1ms works, but I did 20ms to be conservative):

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.debounce(timeoutMillis = 20).collect {
        println(it)
    }
}

which gets you the exact output you wanted:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo

like image 111
Ryan M Avatar answered Sep 25 '22 09:09

Ryan M


This is unfortunatly non trivial with the current state of kotlin Flow, there seem to be important operators missing. But please notice that you are not looking for rxJavas toList(). If you would try to to do it with toList and concatMap in rxjava you would have to wait till all observabes finish. This is not what you want.

Unfortunately for you I think there is no way around a custom function.

It would have to aggregate all the results returned by observeUserForId for all the ids which you would pass to it. It would also not be a simple windowing function, since in reality it is conceivable that one observeUserForId already returned twice and another call still didn't finish. So checking whether you already have the same number of users as you passed ids into your aggregating functions isn't enought, you also have to group by user id.

I'll try to add code later today.

Edit: As promised here is my solution I took the liberty of augmenting the requirements slightly. So the flow will emit every time all userIds have values and an underlying user changes. I think this is more likely what you want since users probably don't change properties in lockstep.

Nevertheless if this is not what you want leave a comment.

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
    val keysSize = keys.size
    val valuesMap = HashMap<K, V>(keys.size)
    flowOf(*keys.toTypedArray())
            .flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
            .collect { (key, value) ->
                valuesMap[key] = value
                if (valuesMap.keys.size == keysSize) {
                    emit(valuesMap.toMap())
                }
            }
}

fun observeUsersForIds(): Flow<List<User>> {
    return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
            .map { m -> m.values.toList() }
}


fun main() = runBlocking {
    observeUsersForIds()
        .collect { user ->
            println(user)
        }
}

This will return

[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

You can run the code online here

like image 39
Lukasz Avatar answered Sep 25 '22 09:09

Lukasz