I'm just getting started with redis, lettuce and asynchronous coding at all. Now sadly I fail to find any examples on how to get the message from the listener into my program. Nor does the javadoc or any other info I find on those functions help a lot. So could someone explain how to get a published message into a string?
My code at the moment looks like this:
RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
I'm pretty sure I have to implement the message methods of the listener, but I have no clue were even to start with that. I know what the params are standing for... but those methods have a return value of void, so they wont output me any messages either.
So, where to even get started? (totally puzzled)
Connect to a Redis Cluster using pub/sub connections. Create a new client that connects to the supplied uri with shared ClientResources . Create a new client that connects to the supplied uri with shared ClientResources . Create a new client that connects to the supplied uri with shared ClientResources .
public class LettuceConnectionFactory extends Object implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory. Connection factory creating Lettuce-based connections. This factory creates a new LettuceConnection on each call to getConnection() .
Non-blocking API for Redis. Lettuce is a scalable thread-safe Redis client based on netty and Reactor. Lettuce provides synchronous, asynchronous and reactive APIs to interact with Redis.
What is Lettuce? Lettuce is a Redis Java client that is fully non-blocking. It supports both synchronous and asynchronous communication. Its complex abstractions allow you to scale products easily. Consider Lettuce as a more advanced client that supports Cluster, Sentinel, Pipelining, and codecs.
You did a good start. Redis Pub/Sub involves at least two parties:
A subscriber (guess that's not much of a surprise) subscribes to channels, patterns or both.
The publisher publishes messages to a channel. This setup is required to be reflected in your code as well.
I extended your code a bit by using RedisPubSubAdapter
, so the code does not require to implement all the methods, just the ones we're interested in, like message(channel, message)
:
RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
}
};
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
Once the listener is added and the client subscribes to a channel, it is ready to receive Pub/Sub messages. lettuce will call your listener's methods as notifications arrive. At this point, it's important to understand that notifications are processed on an I/O thread that is different from the thread that is setting up the client and subscriptions.
Let's come to the sender side. To send messages to your channel, you require an additional connection (alternatively use redis-cli
and issue PUBLISH channel message
).
StatefulRedisConnection<String, String> sender = client.connect();
sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");
Redis will publish the messages Message 1
and Message 1
on the channel named channel
(not a creative name, but it will do the job for now).
If you execute the code consecutively and wait a bit once a message was sent, you've got a pretty high chance the that the listener is notified and you see some system output like:
Channel: channel, Message: Message 1
Channel: channel, Message: Message 2
Now comes the tricky part with asynchronicity. Using asynchronous communication is beneficial in some cases but adds complexity. In cases where you can do work in (some computation until you need the result) before the result arrives or where you want just to initiate I/O and free the thread you're working on. Server applications are good environments for asynchronous patterns. A typical server has limited threading resources and it's running until it's shut down. On server startup, you would register a subscription. As soon as messages come in, it's processed on an I/O thread and your listener is called
When using asynchronous command execution in a standalone application (say a simple main
) then you have a sequential flow. Asynchronous messaging will cause you program to exit once the code flow is finished. This does not necessarily mean that Pub/Sub messages were received or processed. If you would run the two code blocks one by one in a main
you would most probably not see any output at all because the program terminates faster than I/O can happen. Now comes synchronization into play. There are gazillion possibilities how to deal with synchronization, but let's take a look at two alternatives for now:
CountDownLatch
: Requires a number of things to happen before it releases the program flowThread.sleep(…)
: Wait a number of millisecondsCountDownLatch use
final CountDownLatch latch = new CountDownLatch(2);
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
latch.countDown();
}
};
// ...
sender.sync().publish("channel", "Message 2");
latch.await();
In this code above, a CountDownLatch
is armed to count down twice (latch.countDown()
). Calling latch.await()
blocks the main thread (program flow) and causes it to wait until the CountDownLatch
is counted down and hence releases the program to continue.
Thread.sleep(…) use
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
}
};
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
StatefulRedisConnection<String, String> sender = client.connect();
sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");
latch.await();
Thread.sleep(1000);
This code uses Thread.sleep(1000);
to wait for a second (on the main thread). That should usually be enough to receive messages. Don't do this. This approach is quick and dirty and might be okay for playing and debugging but avoid Thread.sleep
in reasonable code.
Redis imposes a constraint on connections that subscribe to a channel/pattern: once subscribed, you are only allowed to execute SUBSCRIBE
, PSUBSCRIBE
, UNSUBSCRIBE
, PUNSUBSCRIBE
, PING
and QUIT
commands. PUBLISH
is not allowed to be executed on that connection. Hence you're required to use an additional connection.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With