Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I handle RabbitMQ Consumer Cancellation Notification when using Spring ChannelAwareMessageListener

Newbie to RabbitMQ and new to Java.

I'm attempting to write a listener that will use manual acks and handle consumer cancellation notifications using the java Spring AMQP abstraction. Can I accomplish both tasks by using the Spring abstraction?

I want to write a listener that will pull messages from a queue and process that message (maybe write to a database or something). I planned on using manual acknowledgements so that if processing of the message fails or can't be completed for some reason, I can reject and requeue. So far I think I've found that in order to manually ack/nack/reject using Spring AMQP I have to use a ChannelAwareMessageListener.

I realize that I should be handling Consumer Cancellation Notifications from RabbitMQ, however using the ChannelAwareMessageListener I don't really see a way to code for this. The only way I see to handle CCN is to write code using the lower level java client api by calling channel.basicConsume() and passing a new DefaultConsumer instance which allows you to handle message delivery and cancels.

I also don't see how I would set the clientProperties on the ConnectionFactory (to tell the broker I can handle the CCN) since I am getting the factory from a bean in config.

My pseudo code of the listener and creation of container is below.

public class MyChannelAwareListener implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        msgProcessed = processMessage(message);

        if(msgProcessed)    
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        else
           channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);  
    }
}

public static void main(String[] args) throws Exception
{
    ConnectionFactory rabbitConnectionFactory;
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext   (MY_CONTEXT_PATH);
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

    MyChannelAwareListener listener = new MyChannelAwareListener();
    container.setMessageListener(listener);
    container.setQueueNames("myQueue");
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.start();
}
like image 504
SamAxe Avatar asked Jun 12 '13 20:06

SamAxe


People also ask

How do I cancel a consumer in RabbitMQ?

queueDeclare(queue, false, true, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleCancel(String consumerTag) throws IOException { // consumer has been cancelled unexpectedly } }; channel. basicConsume(queue, consumer);

How does RabbitMQ push messages to consumers?

Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.

How do you change the prefetch count in RabbitMQ?

RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers.

What is RabbitAdmin?

public RabbitAdmin​(RabbitTemplate rabbitTemplate) Construct an instance using the provided RabbitTemplate . Use this constructor when, for example, you want the admin operations to be performed within the scope of the provided template's invoke() method.


1 Answers

For setting the client properties you need to use the setClientProperties method in the ConnectionFactory (assuming this ConnectionFactory is the object from the RabbitMQ Java library). This method is expecting a Map<String, Object> which contains the properties of the client and the capabilities. The following lines are the default values inside the RabbitMQ Java library:

Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));

Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true);
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true);

props.put("capabilities", capabilities);

For managing ACKs and Consumer cancelling I am not sure how to do it with the Spring AMQP abstraction, however it is perfectly doable with channel.basicConsume which gives you the possibility to handle all the scenarios through all the callback methods:

http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

Hope this helps!

like image 84
hveiga Avatar answered Oct 24 '22 20:10

hveiga