Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get a message from a lettuce RedisPubSubListener in Java?

I'm just getting started with redis, lettuce and asynchronous coding at all. Now sadly I fail to find any examples on how to get the message from the listener into my program. Nor does the javadoc or any other info I find on those functions help a lot. So could someone explain how to get a published message into a string?

My code at the moment looks like this:

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

I'm pretty sure I have to implement the message methods of the listener, but I have no clue were even to start with that. I know what the params are standing for... but those methods have a return value of void, so they wont output me any messages either.

So, where to even get started? (totally puzzled)

like image 813
Anders Bernard Avatar asked Nov 28 '16 08:11

Anders Bernard


People also ask

How does Redis cluster connect to lettuce?

Connect to a Redis Cluster using pub/sub connections. Create a new client that connects to the supplied uri with shared ClientResources . Create a new client that connects to the supplied uri with shared ClientResources . Create a new client that connects to the supplied uri with shared ClientResources .

What is LettuceConnectionFactory?

public class LettuceConnectionFactory extends Object implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory. Connection factory creating Lettuce-based connections. This factory creates a new LettuceConnection on each call to getConnection() .

Which Java Redis client should you use if you need to write a non-blocking reactive application but don't need distributed Java objects?

Non-blocking API for Redis. Lettuce is a scalable thread-safe Redis client based on netty and Reactor. Lettuce provides synchronous, asynchronous and reactive APIs to interact with Redis.

What is lettuce client?

What is Lettuce? Lettuce is a Redis Java client that is fully non-blocking. It supports both synchronous and asynchronous communication. Its complex abstractions allow you to scale products easily. Consider Lettuce as a more advanced client that supports Cluster, Sentinel, Pipelining, and codecs.


1 Answers

You did a good start. Redis Pub/Sub involves at least two parties:

  1. A subscriber
  2. and a publisher

A subscriber (guess that's not much of a surprise) subscribes to channels, patterns or both.

The publisher publishes messages to a channel. This setup is required to be reflected in your code as well.

I extended your code a bit by using RedisPubSubAdapter, so the code does not require to implement all the methods, just the ones we're interested in, like message(channel, message):

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

Once the listener is added and the client subscribes to a channel, it is ready to receive Pub/Sub messages. lettuce will call your listener's methods as notifications arrive. At this point, it's important to understand that notifications are processed on an I/O thread that is different from the thread that is setting up the client and subscriptions.

Let's come to the sender side. To send messages to your channel, you require an additional connection (alternatively use redis-cli and issue PUBLISH channel message).

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");

Redis will publish the messages Message 1 and Message 1 on the channel named channel (not a creative name, but it will do the job for now).

If you execute the code consecutively and wait a bit once a message was sent, you've got a pretty high chance the that the listener is notified and you see some system output like:

Channel: channel, Message: Message 1
Channel: channel, Message: Message 2

Asynchronous: What's the effect?

Now comes the tricky part with asynchronicity. Using asynchronous communication is beneficial in some cases but adds complexity. In cases where you can do work in (some computation until you need the result) before the result arrives or where you want just to initiate I/O and free the thread you're working on. Server applications are good environments for asynchronous patterns. A typical server has limited threading resources and it's running until it's shut down. On server startup, you would register a subscription. As soon as messages come in, it's processed on an I/O thread and your listener is called

When using asynchronous command execution in a standalone application (say a simple main) then you have a sequential flow. Asynchronous messaging will cause you program to exit once the code flow is finished. This does not necessarily mean that Pub/Sub messages were received or processed. If you would run the two code blocks one by one in a main you would most probably not see any output at all because the program terminates faster than I/O can happen. Now comes synchronization into play. There are gazillion possibilities how to deal with synchronization, but let's take a look at two alternatives for now:

  1. CountDownLatch: Requires a number of things to happen before it releases the program flow
  2. Thread.sleep(…): Wait a number of milliseconds

CountDownLatch use

final CountDownLatch latch = new CountDownLatch(2);
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
        latch.countDown();
    }
};

// ...
sender.sync().publish("channel", "Message 2");

latch.await();

In this code above, a CountDownLatch is armed to count down twice (latch.countDown()). Calling latch.await() blocks the main thread (program flow) and causes it to wait until the CountDownLatch is counted down and hence releases the program to continue.

Thread.sleep(…) use

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");

latch.await();

Thread.sleep(1000);

This code uses Thread.sleep(1000); to wait for a second (on the main thread). That should usually be enough to receive messages. Don't do this. This approach is quick and dirty and might be okay for playing and debugging but avoid Thread.sleep in reasonable code.

The thing with two participants

Redis imposes a constraint on connections that subscribe to a channel/pattern: once subscribed, you are only allowed to execute SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT commands. PUBLISH is not allowed to be executed on that connection. Hence you're required to use an additional connection.

like image 81
mp911de Avatar answered Oct 15 '22 04:10

mp911de