Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the difference between SimpleMessageListenerContainer and DirectMessageListenerContainer in Spring AMQP?

What's the difference between SimpleMessageListenerContainer and DirectMessageListenerContainer in Spring AMQP? I checked both of their documentation pages, SimpleMessageListenerContainer has almost no explanation on inner workings, and DirectMessageListenerContainer has the following explanation:

The SimpleMessageListenerContainer is not so simple. Recent changes to the rabbitmq java client has facilitated a much simpler listener container that invokes the listener directly on the rabbit client consumer thread. There is no txSize property - each message is acked (or nacked) individually.

I don't really understand what these mean. It says listener container that invokes the listener directly on the rabbit client consumer thread. If so, then how does SimpleMessageListenerContainer do the invocation?

I wrote a small application and used DirectMessageListenerContainer and just to see the difference, I switched to SimpleMessageListenerContainer, but as far as I can see there was no difference on RabbitMQ side. From Java side the difference was in methods (SimpleMessageListenerContainer provides more) and logs (DirectMessageListenerContainer logged more stuff)

I would like to know the scenarios to use each one of those.

like image 644
Mansur Avatar asked Jun 04 '19 06:06

Mansur


People also ask

What is Simplemessagelistenercontainer?

public void setConcurrentConsumers(int concurrentConsumers) Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue.

What is SpringFramework AMQP?

Group: SpringFramework AMQP It provides high-level abstractions for sending and receiving messages. Last Release on Sep 19, 2022.

What is RabbitTemplate?

Helper class that simplifies synchronous RabbitMQ access (sending and receiving messages). The default settings are for non-transactional messaging, which reduces the amount of data exchanged with the broker. To use a new transaction for every send or receive set the channelTransacted flag.

What is @rabbitlistener?

The bean name of the RabbitListenerContainerFactory to use to create the message listener container responsible to serve this endpoint. If not specified, the default container factory is used, if any.


2 Answers

The SMLC has a dedicated thread for each consumer (concurrency) which polls an internal queue. When a new message arrives for a consumer on the client thread, it is put in the internal queue and the consumer thread picks it up and invokes the listener. This was required with early versions of the client to provide multi-threading. With the newer client that is not a problem so we can invoke the listener directly (hence the name).

There are a few other differences aside from txSize.

See Choosing a Container.

like image 145
Gary Russell Avatar answered Sep 30 '22 21:09

Gary Russell


In the DirectMessageListenerContainer some of the logic is moved into the AMQP implementation as opposed to ListenerContainer as is SimpleMessageListenerContainer

This is what the Javadocs in SimpleMessageListenerContainer say for setTxSize() -

/**
 * Tells the container how many messages to process in a single transaction (if the channel is transactional). For
 * best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}. Also affects
 * how often acks are sent when using {@link AcknowledgeMode#AUTO} - one ack per txSize. Default is 1.
 * @param txSize the transaction size
 */

The client sends an ack every time txSize number of messages are processed. This is controlled in the method

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR

    Channel channel = consumer.getChannel();

    for (int i = 0; i < this.txSize; i++) {

        logger.trace("Waiting for message from consumer.");
        Message message = consumer.nextMessage(this.receiveTimeout);
        .
        .

In the newer implementations, each message is acked on the thread directly and based on the transactional model (Single or publisher confirms) the consumer sends Acknowledgments to Rabbit MQ

like image 32
Aditya Avatar answered Sep 30 '22 20:09

Aditya