Newbie to RabbitMQ and new to Java.
I'm attempting to write a listener that will use manual acks and handle consumer cancellation notifications using the java Spring AMQP abstraction. Can I accomplish both tasks by using the Spring abstraction?
I want to write a listener that will pull messages from a queue and process that message (maybe write to a database or something). I planned on using manual acknowledgements so that if processing of the message fails or can't be completed for some reason, I can reject and requeue. So far I think I've found that in order to manually ack/nack/reject using Spring AMQP I have to use a ChannelAwareMessageListener
.
I realize that I should be handling Consumer Cancellation Notifications from RabbitMQ, however using the ChannelAwareMessageListener
I don't really see a way to code for this. The only way I see to handle CCN is to write code using the lower level java client api by calling channel.basicConsume()
and passing a new DefaultConsumer
instance which allows you to handle message delivery and cancels.
I also don't see how I would set the clientProperties
on the ConnectionFactory
(to tell the broker I can handle the CCN) since I am getting the factory from a bean in config.
My pseudo code of the listener and creation of container is below.
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}
queueDeclare(queue, false, true, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleCancel(String consumerTag) throws IOException { // consumer has been cancelled unexpectedly } }; channel. basicConsume(queue, consumer);
Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.
RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers.
public RabbitAdmin(RabbitTemplate rabbitTemplate) Construct an instance using the provided RabbitTemplate . Use this constructor when, for example, you want the admin operations to be performed within the scope of the provided template's invoke() method.
For setting the client properties you need to use the setClientProperties
method in the ConnectionFactory
(assuming this ConnectionFactory is the object from the RabbitMQ Java library). This method is expecting a Map<String, Object>
which contains the properties of the client and the capabilities. The following lines are the default values inside the RabbitMQ Java library:
Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));
Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true);
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true);
props.put("capabilities", capabilities);
For managing ACKs and Consumer cancelling I am not sure how to do it with the Spring AMQP abstraction, however it is perfectly doable with channel.basicConsume
which gives you the possibility to handle all the scenarios through all the callback methods:
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With