Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Java Client Using DefaultConsumer vs QueueingConsumer

  1. DefaultConsumer
    My DemoConsumer inherits from DefaultConsumer.
    I have noticed that working this way handleDelivery() is invoked from ThreadPool.
    (printing Thread.currentThread().getName() I see pool-1-thread-1/2/3/4 eachtime.
    I have also tested it several times and saw that the order is saved.
    Just to make sure - since different threads call handle delivery - will it mess my order?

  2. QueueingConsumer
    All of the java tutorial use QueueingConsumer to consume messages.
    In the API Docs it is mentioned as a deprecated class.
    Should I change my code to inherit from DefaultConsumer use it? Is the tutorial outdated?

Thanks.

like image 848
Bick Avatar asked Apr 03 '14 14:04

Bick


People also ask

Does RabbitMQ run on Java?

The RabbitMQ Java client library allows Java and JVM-based applications to connect to and interact with RabbitMQ nodes. 5. x release series of this library require JDK 8, both for compilation and at runtime. On Android, this means only Android 7.0 or later versions are supported.

How to close RabbitMQ connection in java?

A connection can be closed via the RabbitMQ Management Interface. Enter the connection tab and press on the connection. Go to the bottom of the page and press Close this connection, followed by pressing Force Close.

What is RabbitMQ consumer tag?

A consumer tag is a consumer identifier which can be either client- or server-generated. To let RabbitMQ generate a node-wide unique tag, use a Channel#basicConsume override that doesn't take a consumer tag argument or pass an empty string for consumer tag and use the value returned by Channel#basicConsume.

Is RabbitMQ thread safe?

In Rabbitmq, Channel is not thread-safe. So it's better to use different threads to handle different channels.


1 Answers

Yes,DefaultConsumer uses an internal thread pool that can be changed. Using ExecutorService as:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

Read http://www.rabbitmq.com/api-guide.html “Advanced Connection options”.

As you can read from the “QueueingConsumer” doc:

As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.

I never used QueueingConsumer, because it isn’t properly event-driven.

As you can see here:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /// here you are blocked, waiting the next message.
    String message = new String(delivery.getBody());
}

A typical problem in this case is how to close the subscription, and a common workaround is to send a tagged close message in local host. Actually I don’t like it so much.

If you extend DefaultConsumer instead, you can correctly close the subscription and the channel:

public class MyConsumer extends DefaultConsumer {...}

then

public static void main(String[] args) {
MyConsumer consumer = new MyConsumer (channel);
String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
System.out.println("press any key to terminate");
System.in.read();
channel.basicCancel(consumerTag);
channel.close();
....

In conclusion, you shouldn’t worry about the message order because if all works correctly, the message order is correct, but I think you can’t assume it because if there is some problem, you can lose the message order. If you absolutely need to maintain message order, you should include a sequential tag to reconstruct the message order at the consumer side.

And you should extend DefaultConsumer.

like image 166
Gabriele Santomaggio Avatar answered Oct 18 '22 21:10

Gabriele Santomaggio