Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to build a "task queue" with RxJava

Currently I'm working on a lot of network-related features. At the moment, I'm dealing with a network channel that allows me to send 1 single piece of information at a time, and I have to wait for it to be acknowledged before I can send the next piece of information. I'm representing the server with 1..n connected clients.

Some of these messages, I have to send in chunks, which is fairly easy to do with RxJava. Currently my "writing" method looks sort of like this:

fun write(bytes: ByteArray, ignoreMtu: Boolean) = 
    server.deviceList()
            .first(emptyList())
            .flatMapObservable { devices ->
                Single.fromCallable {
                    if (ignoreMtu) {
                        bytes.size
                    } else {
                        devices.minBy { device -> device.mtu }?.mtu ?: DEFAULT_MTU
                    }
                }
                        .flatMapObservable { minMtu ->
                            Observable.fromIterable(bytes.asIterable())
                                    .buffer(minMtu)
                        }
                        .map { it.toByteArray() }
                        .doOnNext { server.currentData = bytes }
                        .map { devices }
                        // part i've left out: waiting for each device acknowledging the message, timeouts, etc.
            }

What's missing in here is the part where I only allow one piece of information to be sent at the same time. Also, what I require is that if I'm adding a message into my queue, I have to be able to observe the status of only this message (completed, error).

I've thought about what's the most elegant way to achieve this. Solutions I've came up with include for example a PublishSubject<ByteArray> in which I push the messages (queue-like), add a subscriber and observe it - but this would throw for example onError if the previous message failed.

Another way that crossed my mind was to give each message a number upon creating / queueing it, and have a global "message-counter" Observable which I'd hook into the chain's beginning with a filter for the currently sent message == MY_MESSAGE_ID. But this feels kind of fragile. I could increment the counter whenever the subscription terminates, but I'm sure there must be a better way to achieve my goal.

Thanks for your help.

like image 329
damian Avatar asked Jan 24 '26 21:01

damian


1 Answers

For future reference: The most straight-forward approach I've found is to add a scheduler that's working on a single thread, thus working each task sequential.

like image 116
damian Avatar answered Jan 26 '26 23:01

damian