Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ensure that AMQP exchange exists before publishing a message to it [closed]

Problem

A Java application uses RabbitMQ and client library com.rabbitmq:amqp-client to connect to it. The application declares an AMQP exchange during initialization and periodically publishes messages to it.

If that exchange gets removed for some reason the application cannot publish messages to it and the AMQP channel gets automatically closed by the library. So any subsequent publishes (even after the exchange is re-created) fail with such exception:

Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'logs' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)

How can I guarantee that the exchange does exist before publish? I see the following options.

Possible solutions

1. Try to re-declare exchange before each publish explicitly

Thanks to the fact that exchangeDeclare is idempotent and has no effect if the exchange is already in place I could explicitly declare the exchange before any publish:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message);

The issue with this code is that it looks stupid because most of the time the exchange is in place and the declaration is just redundant.

Also I'm still be in trouble if the exchange is deleted exactly between declaration and publish.

2. Verify that exchange does exist via exchangeDeclarePassive

I could use exchangeDeclarePassive to check that the exchange exists before publish but the following obvious method doesn't work:

private static void ensureExchangeExists(Channel channel) throws IOException {
    try {
        channel.exchangeDeclarePassive(EXCHANGE_NAME);
    } catch (Exception e) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
    }
}

The problem is that if the exchange is missing, exchangeDeclarePassive throws the exception and the channel is closed automatically by the library. So the code in catch block cannot declare the exchange (because it tries to perform an operation on the closed channel).

So I can no longer use single AMQP channel and have to manage them in some way.

Also I'm still in trouble if the exchange is removed between declaration and publish.

3. Catch exception upon publish

You cannot just wrap call channel.basicPublish with a try/catch block because if the exchange is missing then no exception is thrown. This answer explains what actually happens in this situation.

But you can register a ShutdownListener that is able to detect when a channel is closed, investigate the reason (via cause.isInitiatedByApplication()/cause.getReason()) and do the necessary action.

Question

The question is what is the best way to ensure that AMQP exchange exists before publishing a message to it and why?

like image 635
wheleph Avatar asked Aug 17 '15 16:08

wheleph


1 Answers

IME, option #1 is the close to the better option.

What I've found works best, is to have the code for a given publisher encapsulated in a way that lets me declare / re-declare the queue the first time I create the object instance. Then I can re-use the same object instance to publish messages without having to re-declare the exchange again.

If I create a new instance of the object, it will re-declare the exchange. But re-using the same instance prevents that from happening.

This strategy has worked well for me.

the other option is to pre-define and pre-declare your exchanges, queues and bindings at the app startup. that way you don't have to worry about it anymore. the downside here, is you have pre-determine all exchanges, queues and bindings... which works in some apps but not in others.

like image 152
Derick Bailey Avatar answered Sep 18 '22 17:09

Derick Bailey