I'm converting a redis pub/sub system to redis streams, so that I can add some fault tolerance to my server sent events.
Subscribing the traditional way is trivial:
import { createClient } from 'redis';
const redisOptions = {
url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);
redis.on("message", (channel, message) => {
console.log(channel);
console.log(message);
});
redis.subscribe('foo');
This blocks permanently, and keeps the connection open. Publishing to redis will add to your log, in this case.
const json = { a: 1, b: 2 };
redis.publish('foo', JSON.stringify(json));
Switching over to streams, you use XREAD
instead of subscribe, and XADD
instead of publish, and the data is dramatically different. The part I'm struggling with is the blocking.
redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
if (err) return console.error('Error reading from stream:', err);
str.forEach(message => {
console.log(message);
});
}
When sending messages, the first one is picked up by my "subscription", but no further messages are logged.
Truth be told, I only asked this question because google was no good to me, and I couldn't find anyone else posting about this problem. Hope this helps!
So, XREAD
is only blocking on the initial call. It will sit and wait, for a set time period (or indefinitely if you set the time to 0), but once it receives data, its duty is considered fulfilled, and it unblocks. To keep the "subscription" alive, you need to call XREAD
again, with the most recent id from the stream. This replaces the initial $
value we passed it.
Recursion seemed like a perfect solution:
const xread = ({ stream, id }) => {
redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
if (err) return console.error('Error reading from stream:', err);
str[0][1].forEach(message => {
id = message[0];
console.log(id);
console.log(message[1]);
});
setTimeout(() => xread({ stream, id }), 0)
});
}
xread({ stream: 'asdf', id: '$' })
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With