Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using RabbitMQ (Java client), is there a way to determine if network connection is closed during consume?

I'm using RabbitMQ on RHEL 5.3 using the Java client. I have 2 nodes (machines). Node1 is consuming messages from a queue on Node2 using the Java helper class QueueingConsumer.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   ... Process message - delivery.getBody()
}

If the interface is brought down on Node1 or Node2 (e.g. ifconfig eth1 down), the client (above) never knows the network isn't there anymore. Does RabbitMQ provide some type of configuration on the Java client that can be used to determine if the connection has gone away. Shutting down the RabbitMQ server on Node2 will trigger a ShutdownSignalException, which can be caught and the app can go into a reconnect loop. But bringing down the interface doesn't cause any type of exception to happen, so the code will be waiting forever on consumer.nextDelivery().

I've also tried using the timeout version of this call. e.g.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
int timeout_ms = 30000;
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms);
   if (delivery == null)
   {
      if (channel.isOpen() == false)             // Seems to always return true
      { throw new ShutdownSignalException(); }
   }
   else
   {
     ... Process message - delivery.getBody()
   }
}

but appears that this always returns true (even though the interface is down). I assume registering for the ShutdownListener on the connection will yield the same results, but haven't tried that yet.

Is there a way to configure some sort of heartbeat, or do you just have to write custom lease logic (e.g. "I'm here now") in order to get this to work?

like image 704
MItch Branting Avatar asked Mar 18 '10 18:03

MItch Branting


People also ask

How do I check RabbitMQ connectivity?

Verify Server Configuration Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.

How do I close a RabbitMQ connection?

A connection can be closed via the RabbitMQ Management Interface. Enter the connection tab and press on the connection. Go to the bottom of the page and press Close this connection, followed by pressing Force Close.

What is connection timeout in RabbitMQ?

The heartbeat timeout value defines after what period of time the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries. This value is negotiated between the client and RabbitMQ server at the time of connection.


2 Answers

In general, you're much better off posting questions regarding rabbitmq on the rabbitmq-discuss mailing list. We don't tend to track questions being asked outside of this.

There is a heartbeat that you can configure, though it is off by default. You could also turn on TCP Keep Alive. Either call setRequestedHeartbeat on the ConnectionFactory before creating a new connection, or, subclass ConnectionFactory, override the configureSocket method, and call socket.setKeepAlive(true). Both should result in the connection noticing when the network dies.

like image 186
Matthew Sackman Avatar answered Oct 18 '22 16:10

Matthew Sackman


Regarding the isOpen method, that is well described in the docs: http://www.rabbitmq.com/api-guide.html#shutdown-atomicity

Regarding the shutting down: with shutting down node1 or 2 you mean the application right, not the RabbitMQ server itself? Why would you want to know on any application if another application disconnects from the message broker? That's not the point of messaging.

The only thing you can do, is send messages with a 'mandatory' parameter. That tells the RabbitMQ server you expect at least 1 listener for the message you've sent (whether that be a direct queue or some queue in a topic/fanout exchange). If the message then can not be delivered to any queue, the message will return to your channel and forwarded to given ReturnListener.

like image 25
momania Avatar answered Oct 18 '22 18:10

momania