Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to call a coroutine usecase from a rxjava flat map

Hi I have a rxjava flat map in which I want to call a coroutine usecase onStandUseCase which is an api call

Initially the use case was also rxjava based and it used to return Observable<GenericResponse> and it worked fine now that I changed the use to be coroutines based it only returns GenericResponse

how can modify the flatmap to work fine with coroutines use case please

        subscriptions += view.startFuellingObservable
        .onBackpressureLatest()
        .doOnNext { view.showLoader(false) }
        .flatMap {
            if (!hasOpenInopIncidents()) {
                //THIS IS WHERE THE ERROR IS IT RETURNS GENERICRESPONSE
                onStandUseCase(OnStandUseCase.Params("1", "2", TimestampedAction("1", "2", DateTime.now()))) {
                    
                }
               
            } else {
                val incidentOpenResponse = GenericResponse(false)
                incidentOpenResponse.error = OPEN_INCIDENTS
                Observable.just(incidentOpenResponse)
            }
        }
        .subscribe(
            { handleStartFuellingClicked(view, it) },
            { onStartFuellingError(view) }
        )

OnStandUseCase.kt

class OnStandUseCase @Inject constructor(
    private val orderRepository: OrderRepository,
    private val serviceOrderTypeProvider: ServiceOrderTypeProvider
) : UseCaseCoroutine<GenericResponse, OnStandUseCase.Params>() {

    override suspend fun run(params: Params) = orderRepository.notifyOnStand(
        serviceOrderTypeProvider.apiPathFor(params.serviceType),
        params.id,
        params.action
    )

    data class Params(val serviceType: String, val id: String, val action: TimestampedAction)
}

UseCaseCoroutine

abstract class UseCaseCoroutine<out Type, in Params> where Type : Any {

    abstract suspend fun run(params: Params): Type

    operator fun invoke(params: Params, onResult: (type: Type) -> Unit = {}) {
        val job = GlobalScope.async(Dispatchers.IO) { run(params) }
        GlobalScope.launch(Dispatchers.Main) { onResult(job.await()) }
    }
}

startFuellingObservable is

val startFuellingObservable: Observable<Void>

Here is the image of the error

enter image description here

Any suggestion on how to fix this please

thanks in advance R

like image 720
BRDroid Avatar asked Oct 15 '22 22:10

BRDroid


1 Answers

There is the integration library linking RxJava and Kotlin coroutines.

rxSingle can be used to turn a suspend function into a Single. OP wants an Observable, so we can call toObservable() for the conversion.

.flatMap {
    if (!hasOpenInopIncidents()) {
        rxSingle {
            callYourSuspendFunction()
        }.toObservable()
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Observable.just(incidentOpenResponse)
    }
}

Note that the Observables in both branches contain just one element. We can make this fact more obvious by using Observable#concatMapSingle.

.concatMapSingle {
    if (!hasOpenInopIncidents()) {
        rxSingle { callYourSuspendFunction() }
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Single.just(incidentOpenResponse)
    }
}
like image 94
George Leung Avatar answered Oct 18 '22 12:10

George Leung