DefaultConsumer
My DemoConsumer inherits from DefaultConsumer.
I have noticed that working this way handleDelivery() is invoked from ThreadPool.
(printing Thread.currentThread().getName() I see pool-1-thread-1/2/3/4 eachtime.
I have also tested it several times and saw that the order is saved.
Just to make sure - since different threads call handle delivery - will it mess my order?
QueueingConsumer
All of the java tutorial use QueueingConsumer to consume messages.
In the API Docs it is mentioned as a deprecated class.
Should I change my code to inherit from DefaultConsumer use it? Is the tutorial outdated?
Thanks.
The RabbitMQ Java client library allows Java and JVM-based applications to connect to and interact with RabbitMQ nodes. 5. x release series of this library require JDK 8, both for compilation and at runtime. On Android, this means only Android 7.0 or later versions are supported.
A connection can be closed via the RabbitMQ Management Interface. Enter the connection tab and press on the connection. Go to the bottom of the page and press Close this connection, followed by pressing Force Close.
A consumer tag is a consumer identifier which can be either client- or server-generated. To let RabbitMQ generate a node-wide unique tag, use a Channel#basicConsume override that doesn't take a consumer tag argument or pass an empty string for consumer tag and use the value returned by Channel#basicConsume.
In Rabbitmq, Channel is not thread-safe. So it's better to use different threads to handle different channels.
Yes,DefaultConsumer
uses an internal thread pool that can be changed.
Using ExecutorService
as:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Read http://www.rabbitmq.com/api-guide.html “Advanced Connection options”.
As you can read from the “QueueingConsumer” doc:
As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.
I never used QueueingConsumer, because it isn’t properly event-driven.
As you can see here:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
/// here you are blocked, waiting the next message.
String message = new String(delivery.getBody());
}
A typical problem in this case is how to close the subscription, and a common workaround is to send a tagged close message in local host. Actually I don’t like it so much.
If you extend DefaultConsumer
instead, you can correctly close the subscription and the channel:
public class MyConsumer extends DefaultConsumer {...}
then
public static void main(String[] args) {
MyConsumer consumer = new MyConsumer (channel);
String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
System.out.println("press any key to terminate");
System.in.read();
channel.basicCancel(consumerTag);
channel.close();
....
In conclusion, you shouldn’t worry about the message order because if all works correctly, the message order is correct, but I think you can’t assume it because if there is some problem, you can lose the message order. If you absolutely need to maintain message order, you should include a sequential tag to reconstruct the message order at the consumer side.
And you should extend DefaultConsumer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With