Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Flow execute two API calls in parallel and collect each result as it arrives

I am trying to implement cache then network strategy for my API call using Kotlin Flows. Here is what I am trying right now

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}

Problem here is collect is only called when getDataFromServer returns. My expectation is that I should get first event from cache and then second event from server after a few milliseconds. In this case "Response Received"gets printed twice but immediately one after other.

In this other variant "Response Received" only gets printed once that is after getDataFromServer() returns.

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }

I was using RxJava's Flowable.concat() before and it was working perfectly. Is there something in Kotlin Flows which can emulate that behaviour?

like image 268
Abhishek Bansal Avatar asked Feb 12 '20 17:02

Abhishek Bansal


People also ask

What is buffer in kotlin flow?

The buffer function actually creates a second coroutine that allows the flow and collect functions to execute concurrently. Without the call to buffer , each element must be done with the collect before flow can continue with the next element, because both functions are executed by the same coroutine.

Is kotlin flow collect blocking?

Flow is an idiomatic way in kotlin to publish sequence of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further.

How do you collect kotlin flow?

Collecting from a flow To get all the values in the stream as they're emitted, use collect . You can learn more about terminal operators in the official flow documentation. As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value.

What is a single item/value function in Kotlin flow?

It’s a Kotlin Flow operator that emits a single item/value after combining the emission of two flow collections via a specified function. This means it zips values from the current flow with other flow and emits them as a single item/value for each combination based on the results of this specified function. Let’s understand with the example.

What is a zip operator in Kotlin flow?

What is a zip operator in Kotlin Flow? Zip Operator is an operator that combines the emissions of two flow collections together via a specified function and emits single items for each combination based on the results of this function. Join and learn Dagger, Kotlin, RxJava, MVVM, Architecture Components, Coroutines, Unit Testing and much more.

How do I make two network calls in parallel in Kotlin?

The ParallelNetworkCallsViewModel, then asks the data layer for the list of users using the ApiHelper. The ViewModel makes the two network calls in parallel which are as getUsers and getMoreUsers. As you can see below, the ViewModel uses the Kotlin Coroutines and LiveData.

How to create a flow in Kotlin for Android?

Kotlin flows on Android 1 Creating a flow. To create flows, use the flow builder APIs. ... 2 Modifying the stream. ... 3 Collecting from a flow. ... 4 Catching unexpected exceptions. ... 5 Executing in a different CoroutineContext. ... 6 Flows in Jetpack libraries. ... 7 Convert callback-based APIs to flows. ... 8 Additional flow resources


2 Answers

Recently, merge operator was added to the Kotlin coroutines version 1.3.3. Here is the merged PR.

Using the merge operator, you should be able to get the result as and when it arrives.

like image 95
ragdroid Avatar answered Oct 12 '22 01:10

ragdroid


Problem here is collect is only called when getDataFromServer returns.

The first problematic thing with your design is that the Flow-returning function is also suspendable. That's two layers of suspendability. Functions should return flows without any delays and the flows themselves should emit items as they come in. If you followed this guideline, your initial code would already work.

The way you wrote these functions, they can still work if you write this:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}

This statement completes immediately, returning a cold flow. When you call collect on it, it first calls getCached() and emits the cached value, and then calls getFromServer() and emits the server response.


The above solution starts the server call only after you consume the cached value. If you need the two flows to be active concurrently, use flatMapMerge.

Assuming you fixed the above basic problem and made your Flow-returning functions non-suspending, all you need is this:

flowOf(getCached(), getFromServer()).flattenMerge()

If for some reason you can't do that, you have to add the emitAll wrapper around each call:

flowOf(
    flow { emitAll(getCached()) }, 
    flow { emitAll(getFromServer()) }
).flattenMerge()
like image 42
Marko Topolnik Avatar answered Oct 11 '22 23:10

Marko Topolnik