I am building a PUB/SUB system backed by Redis.
I have one publisher and tons of subscribers. The subscribers are not that reliable, they can lose connection at any time and need to be able to "recover" from a lost connection.
There is a twist though, I would like my backlog capped at some number, meaning that a faulty subscriber should be able to recover only up to N messages.
The trivial solution is:
RPUSH list message
PUBLISH channel encoded
If a consumer needs to re-establish:
PSUBSCRIBE
atomicallyUp to here we are all good.
My big question is, what if I want the backlog list to be capped at N items?
Is there any way I can keep an ever increasing index AND a capped backlog in the list?
Redis Pub/Sub is designed for speed (low latency), but only with low numbers of subscribers —subscribers don't poll and while subscribed/connected are able to receive push notifications very quickly from the Redis broker—in the low ms, even < 1ms as confirmed by this benchmark.
Basic Pub-Sub modelStart by opening three terminal sessions and launch the Redis CLI in each of them. Once you have all the terminals open and set up, use one of the terminals to SUBSCRIBE to a channel. The name will entirely depend on you, and you can name it whatever you want.
However a small handful of Redis commands are long-term blocking or non-atomic. The PUBSUB subscribe and psubscribe commands are non-atomic in that the command will register it's request to have any messages sent to the given channel(s) sent to it as well, but the actual data can arrive much later – ie.
More precisely, Redis Pub/Sub is designed for real-time communication between instances where low latency is of the utmost importance, and as such doesn't feature any form of persistence or acknowledgment.
How about this? To publish a message, do
LPUSH list message
LTRIM list 0 N
INCR global_index
PUBLISH channel global_index
When receiving a message over pub/sub and when starting up, clients will need to compare their latest index (this could also be kept in redis, or somewhere else) with global_index and read min(global_index - my_index, N) messages from the list to catch up (basically LRANGE list 0 (global_index - my_index)
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With