I am trying to implement cache then network strategy for my API call using Kotlin
Flows
.
Here is what I am trying right now
flowOf(
remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
.catch { error -> Timber.e(error) },
remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
Timber.i("Response Received")
}
Problem here is collect
is only called when getDataFromServer
returns. My expectation is that I should get first event from cache and then second event from server after a few milliseconds. In this case "Response Received"
gets printed twice but immediately one after other.
In this other variant "Response Received"
only gets printed once that is after getDataFromServer()
returns.
remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
.catch { error -> Timber.e(error) }
.flatMapConcat {
remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
}
.collect {
Timber.i("Response Received")
}
I was using RxJava's Flowable.concat()
before and it was working perfectly. Is there something in Kotlin Flows which can emulate that behaviour?
The buffer function actually creates a second coroutine that allows the flow and collect functions to execute concurrently. Without the call to buffer , each element must be done with the collect before flow can continue with the next element, because both functions are executed by the same coroutine.
Flow is an idiomatic way in kotlin to publish sequence of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further.
Collecting from a flow To get all the values in the stream as they're emitted, use collect . You can learn more about terminal operators in the official flow documentation. As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value.
It’s a Kotlin Flow operator that emits a single item/value after combining the emission of two flow collections via a specified function. This means it zips values from the current flow with other flow and emits them as a single item/value for each combination based on the results of this specified function. Let’s understand with the example.
What is a zip operator in Kotlin Flow? Zip Operator is an operator that combines the emissions of two flow collections together via a specified function and emits single items for each combination based on the results of this function. Join and learn Dagger, Kotlin, RxJava, MVVM, Architecture Components, Coroutines, Unit Testing and much more.
The ParallelNetworkCallsViewModel, then asks the data layer for the list of users using the ApiHelper. The ViewModel makes the two network calls in parallel which are as getUsers and getMoreUsers. As you can see below, the ViewModel uses the Kotlin Coroutines and LiveData.
Kotlin flows on Android 1 Creating a flow. To create flows, use the flow builder APIs. ... 2 Modifying the stream. ... 3 Collecting from a flow. ... 4 Catching unexpected exceptions. ... 5 Executing in a different CoroutineContext. ... 6 Flows in Jetpack libraries. ... 7 Convert callback-based APIs to flows. ... 8 Additional flow resources
Recently, merge
operator was added to the Kotlin coroutines version 1.3.3
. Here is the merged PR.
Using the merge operator, you should be able to get the result as and when it arrives.
Problem here is
collect
is only called whengetDataFromServer
returns.
The first problematic thing with your design is that the Flow-returning function is also suspendable. That's two layers of suspendability. Functions should return flows without any delays and the flows themselves should emit items as they come in. If you followed this guideline, your initial code would already work.
The way you wrote these functions, they can still work if you write this:
flow<String> {
emitAll(getCached())
emitAll(getFromServer())
}
This statement completes immediately, returning a cold flow. When you call collect
on it, it first calls getCached()
and emits the cached value, and then calls getFromServer()
and emits the server response.
The above solution starts the server call only after you consume the cached value. If you need the two flows to be active concurrently, use flatMapMerge
.
Assuming you fixed the above basic problem and made your Flow-returning functions non-suspending, all you need is this:
flowOf(getCached(), getFromServer()).flattenMerge()
If for some reason you can't do that, you have to add the emitAll
wrapper around each call:
flowOf(
flow { emitAll(getCached()) },
flow { emitAll(getFromServer()) }
).flattenMerge()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With