Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node Redis XREAD blocking subscription

I'm converting a redis pub/sub system to redis streams, so that I can add some fault tolerance to my server sent events.

Subscribing the traditional way is trivial:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

This blocks permanently, and keeps the connection open. Publishing to redis will add to your log, in this case.

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

Switching over to streams, you use XREAD instead of subscribe, and XADD instead of publish, and the data is dramatically different. The part I'm struggling with is the blocking.

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

When sending messages, the first one is picked up by my "subscription", but no further messages are logged.

like image 324
Dudo Avatar asked Jun 03 '20 18:06

Dudo


Video Answer


1 Answers

Truth be told, I only asked this question because google was no good to me, and I couldn't find anyone else posting about this problem. Hope this helps!

So, XREAD is only blocking on the initial call. It will sit and wait, for a set time period (or indefinitely if you set the time to 0), but once it receives data, its duty is considered fulfilled, and it unblocks. To keep the "subscription" alive, you need to call XREAD again, with the most recent id from the stream. This replaces the initial $ value we passed it.

Recursion seemed like a perfect solution:

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    setTimeout(() => xread({ stream, id }), 0)
  });
}

xread({ stream: 'asdf', id: '$' })

like image 70
Dudo Avatar answered Sep 22 '22 10:09

Dudo