Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin flows as a message queue between coroutines

I'm attempting to use Kotlin's Flow class as a message queue to transfer data from a producer (a camera) to a set of workers (image analyzers) running on separate coroutines.

The producer in my case is a camera, and will run substantially faster than the workers. Back pressure should be handled by dropping data so that the image analyzers are always operating on the latest images from the camera.

When using channels, this solution works, but seems messy and does not provide an easy way for me to translate the data between the camera and the analyzers (like flow.map).

class ImageAnalyzer<Result> {
    fun analyze(image: Bitmap): Result {
        // perform some work on the image and return a Result. This can take a long time.
    }
}

class CameraAdapter {

    private val imageChannel = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
    private val imageReceiveMutex = Mutex()

    // additional code to make this camera work and listen to lifecycle events of the enclosing activity.

    protected fun sendImageToStream(image: CameraOutput) {
        // use channel.offer to ensure the latest images are processed
        runBlocking { imageChannel.offer(image) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy() {
        runBlocking { imageChannel.close() }
    }

    /**
     * Get the stream of images from the camera.
     */
    fun getImageStream(): ReceiveChannel<Bitmap> = imageChannel
}

class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
    private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
    private val cancelMutex = Mutex()

   var finished = false // this can be set elsewhere when enough images have been analyzed

    fun subscribeTo(channel: ReceiveChannel<Bitmap>, processingCoroutineScope: CoroutineScope) {
        // omit some checks to make sure this is not already subscribed

        processingCoroutineScope.launch {
            val workerScope = this
            workers.forEachIndexed { index, worker ->
                launch(Dispatchers.Default) {
                    startWorker(channel, workerScope, index, worker)
                }
            }
        }
    }

    private suspend fun startWorker(
        channel: ReceiveChannel<Bitmap>,
        workerScope: CoroutineScope,
        workerId: Int,
        worker: ImageAnalyzer
    ) {
        for (bitmap in channel) {
            analysisResults.send(worker.analyze(bitmap))

            cancelMutex.withLock {
                if (finished && workerScope.isActive) {
                    workerScope.cancel()
                }
            }
        }
    }
}

class ExampleApplication : CoroutineScope {
    private val cameraAdapter: CameraAdapter = ...
    private val imageProcessor: ImageProcessor<Result> = ...

    fun analyzeCameraStream() {
        imageProcessor.subscribeTo(cameraAdapter.getImageStream())
    }
}

What's the proper way to do this? I would like to use a ChannelFlow instead of a Channel to pass data between the camera and the ImageProcessor. This would allow me to call flow.map to add metadata to the images before they're sent to the analyzers. However, when doing so, each ImageAnalyzer gets a copy of the same image instead of processing different images in parallel. Is it possible to use a Flow as a message queue rather than a broadcaster?

like image 822
Adam Wushensky Avatar asked Nov 06 '22 07:11

Adam Wushensky


1 Answers

I got this working with flows! It was important to keep the flows backed by a channel throughout this sequence so that each worker would pick up unique images to operate on. I've confirmed this functionality through unit tests.

Here's my updated code for posterity:

class ImageAnalyzer<Result> {
    fun analyze(image: Bitmap): Result {
        // perform some work on the image and return a Result. This can take a long time.
    }
}

class CameraAdapter {

    private val imageStream = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
    private val imageReceiveMutex = Mutex()

    // additional code to make this camera work and listen to lifecycle events of the enclosing activity.

    protected fun sendImageToStream(image: CameraOutput) {
        // use channel.offer to enforce the drop back pressure strategy
        runBlocking { imageChannel.offer(image) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy() {
        runBlocking { imageChannel.close() }
    }

    /**
     * Get the stream of images from the camera.
     */
    fun getImageStream(): Flow<Bitmap> = imageChannel.receiveAsFlow()
}

class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
    private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
    private val cancelMutex = Mutex()

   var finished = false // this can be set elsewhere when enough images have been analyzed

    fun subscribeTo(flow: Flow<Bitmap>, processingCoroutineScope: CoroutineScope): Job {
        // omit some checks to make sure this is not already subscribed

        return processingCoroutineScope.launch {
            val workerScope = this
            workers.forEachIndexed { index, worker ->
                launch(Dispatchers.Default) {
                    startWorker(flow, workerScope, index, worker)
                }
            }
        }
    }

    private suspend fun startWorker(
        flow: Flow<Bitmap>,
        workerScope: CoroutineScope,
        workerId: Int,
        worker: ImageAnalyzer
    ) {
        while (workerScope.isActive) {
            flow.collect { bitmap ->
                analysisResults.send(worker.analyze(bitmap))

                cancelMutex.withLock {
                    if (finished && workerScope.isActive) {
                        workerScope.cancel()
                    }
                }
            }
        }
    }

    fun getAnalysisResults(): Flow<Result> = analysisResults.receiveAsFlow()
}

class ExampleApplication : CoroutineScope {
    private val cameraAdapter: CameraAdapter = ...
    private val imageProcessor: ImageProcessor<Result> = ...

    fun analyzeCameraStream() {
        imageProcessor.subscribeTo(cameraAdapter.getImageStream())
    }
}

It appears that, so long as the flow is backed by a channel, the subscribers will each get a unique image.

like image 97
Adam Wushensky Avatar answered Dec 04 '22 00:12

Adam Wushensky