Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use consumer groups with Spring Data Redis for Redis Streams (keep getting NOGROUP)?

I'm trying to use Spring Data Redis to consume a Redis Stream using consumer groups, but keep getting the following exception:

Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option

The message seems to hint that I first need to create a consumer group? But the documentation does not provide any reference to this: https://github.com/spring-projects/spring-data-redis/blob/master/src/main/asciidoc/reference/redis-streams.adoc

Framework Versions:

  • Spring Boot 2.2.6
  • Lettuce 5.2.2
  • Redis 5.0.8

This is the code I'm using to consume the stream:

@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofMillis(100)).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
                        containerOptions);

        container.receive(Consumer.from("my-group", "my-consumer"),
                        StreamOffset.create("event-stream", ReadOffset.latest()),
                        message -> {
                                System.out.println("MessageId: " + message.getId());
                                System.out.println("Stream: " + message.getStream());
                                System.out.println("Body: " + message.getValue());
                                streamRedisTemplate.opsForStream().acknowledge("my-group", message);
                        });

        /*Subscription subscription = container.receive(StreamOffset.fromStart("event-stream"), message -> {

                System.out.println("MessageId: " + message.getId());
                System.out.println("Stream: " + message.getStream());
                System.out.println("Body: " + message.getValue());
        });*/

        container.start();

        return container;
}

Full Stack Trace:

org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:52) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:361) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:239) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    ... 1 common frames omitted
like image 703
WickedElephant Avatar asked Apr 27 '20 17:04

WickedElephant


People also ask

Which of these Redis data structures can be used to create consumer groups?

XGROUP is used in order to create, destroy and manage consumer groups. XREADGROUP is used to read from a stream via a consumer group. XACK is the command that allows a consumer to mark a pending message as correctly processed.

Is Redis Streams good for large data?

Redis delivers more than a million read/write operations per second, with sub-millisecond latency on a modestly sized commodity cloud instance, making it extremely resource-efficient for large volumes of data.

Are Redis Streams persisted?

A Stream, like any other Redis data structure, is asynchronously replicated to replicas and persisted into AOF and RDB files.

Which command do you use to add a value to a Redis stream?

XADD is the only Redis command that can add data to a stream, but there are other commands, such as XDEL and XTRIM , that are able to remove data from a stream.


1 Answers

Answering my own question. It seems as though you do need to explicitly create the stream and group first, even though not mentioned anywhere in the docs. Although there should really be a better way to initialize an empty stream other than publishing a message to it.

private void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
        try {
                //redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
                redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
        } catch (RedisSystemException e) {
                if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
                        log.info("STREAM - Redis group already exists, skipping Redis group creation: my-group-2");
                } else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
                        log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
                        // TODO: There has to be a better way to create a stream than this!?
                        redisTemplate.opsForStream().add("event-stream", Collections.singletonMap("", ""));
                        redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
                } else throw e;
        }
}

EDIT: As mentioned by @anstue in the comments below, spring-data-redis 2.3.1+ now automatically creates the stream if it doesn't exist, when calling createGroup. However, it will throw a RedisSystemBusyException if the group does already exist. So I'm updating the answer with the solution I am currently using, making sure to catch this exception.

public class EventStreamUtils {

    public static void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
        try {
            // ReadOffset.from("0-0") will start reading stream from the very beginning.  Otherwise,
            // it will pick up at the point in the stream where the new group was created.
            //redisTemplate.opsForStream().createGroup(key, ReadOffset.from("0-0"), group);
            redisTemplate.opsForStream().createGroup(key, group);
        } catch (RedisSystemException e) {
            var cause = e.getRootCause();
            if (cause != null && RedisBusyException.class.equals(cause.getClass())) {
                log.info("STREAM - Redis group already exists, skipping Redis group creation: {}", group);
            } else throw e;
        }
    }
}
like image 94
WickedElephant Avatar answered Oct 27 '22 14:10

WickedElephant