I'm attempting to use Kotlin's Flow
class as a message queue to transfer data from a producer (a camera) to a set of workers (image analyzers) running on separate coroutines.
The producer in my case is a camera, and will run substantially faster than the workers. Back pressure should be handled by dropping data so that the image analyzers are always operating on the latest images from the camera.
When using channels, this solution works, but seems messy and does not provide an easy way for me to translate the data between the camera and the analyzers (like flow.map
).
class ImageAnalyzer<Result> {
fun analyze(image: Bitmap): Result {
// perform some work on the image and return a Result. This can take a long time.
}
}
class CameraAdapter {
private val imageChannel = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
private val imageReceiveMutex = Mutex()
// additional code to make this camera work and listen to lifecycle events of the enclosing activity.
protected fun sendImageToStream(image: CameraOutput) {
// use channel.offer to ensure the latest images are processed
runBlocking { imageChannel.offer(image) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
runBlocking { imageChannel.close() }
}
/**
* Get the stream of images from the camera.
*/
fun getImageStream(): ReceiveChannel<Bitmap> = imageChannel
}
class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
private val cancelMutex = Mutex()
var finished = false // this can be set elsewhere when enough images have been analyzed
fun subscribeTo(channel: ReceiveChannel<Bitmap>, processingCoroutineScope: CoroutineScope) {
// omit some checks to make sure this is not already subscribed
processingCoroutineScope.launch {
val workerScope = this
workers.forEachIndexed { index, worker ->
launch(Dispatchers.Default) {
startWorker(channel, workerScope, index, worker)
}
}
}
}
private suspend fun startWorker(
channel: ReceiveChannel<Bitmap>,
workerScope: CoroutineScope,
workerId: Int,
worker: ImageAnalyzer
) {
for (bitmap in channel) {
analysisResults.send(worker.analyze(bitmap))
cancelMutex.withLock {
if (finished && workerScope.isActive) {
workerScope.cancel()
}
}
}
}
}
class ExampleApplication : CoroutineScope {
private val cameraAdapter: CameraAdapter = ...
private val imageProcessor: ImageProcessor<Result> = ...
fun analyzeCameraStream() {
imageProcessor.subscribeTo(cameraAdapter.getImageStream())
}
}
What's the proper way to do this? I would like to use a ChannelFlow
instead of a Channel
to pass data between the camera and the ImageProcessor
. This would allow me to call flow.map
to add metadata to the images before they're sent to the analyzers. However, when doing so, each ImageAnalyzer gets a copy of the same image instead of processing different images in parallel. Is it possible to use a Flow
as a message queue rather than a broadcaster?
I got this working with flows! It was important to keep the flows backed by a channel throughout this sequence so that each worker would pick up unique images to operate on. I've confirmed this functionality through unit tests.
Here's my updated code for posterity:
class ImageAnalyzer<Result> {
fun analyze(image: Bitmap): Result {
// perform some work on the image and return a Result. This can take a long time.
}
}
class CameraAdapter {
private val imageStream = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
private val imageReceiveMutex = Mutex()
// additional code to make this camera work and listen to lifecycle events of the enclosing activity.
protected fun sendImageToStream(image: CameraOutput) {
// use channel.offer to enforce the drop back pressure strategy
runBlocking { imageChannel.offer(image) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
runBlocking { imageChannel.close() }
}
/**
* Get the stream of images from the camera.
*/
fun getImageStream(): Flow<Bitmap> = imageChannel.receiveAsFlow()
}
class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
private val cancelMutex = Mutex()
var finished = false // this can be set elsewhere when enough images have been analyzed
fun subscribeTo(flow: Flow<Bitmap>, processingCoroutineScope: CoroutineScope): Job {
// omit some checks to make sure this is not already subscribed
return processingCoroutineScope.launch {
val workerScope = this
workers.forEachIndexed { index, worker ->
launch(Dispatchers.Default) {
startWorker(flow, workerScope, index, worker)
}
}
}
}
private suspend fun startWorker(
flow: Flow<Bitmap>,
workerScope: CoroutineScope,
workerId: Int,
worker: ImageAnalyzer
) {
while (workerScope.isActive) {
flow.collect { bitmap ->
analysisResults.send(worker.analyze(bitmap))
cancelMutex.withLock {
if (finished && workerScope.isActive) {
workerScope.cancel()
}
}
}
}
}
fun getAnalysisResults(): Flow<Result> = analysisResults.receiveAsFlow()
}
class ExampleApplication : CoroutineScope {
private val cameraAdapter: CameraAdapter = ...
private val imageProcessor: ImageProcessor<Result> = ...
fun analyzeCameraStream() {
imageProcessor.subscribeTo(cameraAdapter.getImageStream())
}
}
It appears that, so long as the flow is backed by a channel, the subscribers will each get a unique image.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With