Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to continuosly listen on redis stream using lettuce java library

I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don't think a while loop is required. But the following code seems to not work.

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn't the callback supposed to be called for every message that is newly added into the stream?

like image 291
falcon Avatar asked Nov 25 '22 23:11

falcon


1 Answers

I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same Flux like below and it worked for me with xread. I think the same should work for xreadgroup as well.

RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
                .doOnNext(msg -> {
                    sink.tryEmitNext(msg.getBody().get("key"));
                })
                .repeat()
                .subscribe();
like image 143
Lakmal Avatar answered Dec 21 '22 14:12

Lakmal