I implemented a consumer, which will reconnect to broker automatically after a while if underlying connection is closed. My case is as below:
Stop RabbitMQ server, consumer will show an exception:
com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}.
And then consumer will sleep 60 seconds before reconnect.
In this case, all messages published before reconnect will be lost. Also I performed another experiment.
Note:The QOS of consumer is 1. I have researched RabbitMQ several days, in my understanding, consumer should get the message published before reconnect. Pls help(I ran test based on windows rabbitMQ).
Below is the PUBLISHER:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();
And the CONSUMER is as below:
@Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
final int qos) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
while (true) {
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
// declare a durable, non-exclusive, non-autodelete queue.
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// distribute workload among all consumers, consumer will
// pre-fetch
// {qos}
// messages to local buffer.
channel.basicQos(qos);
logger.debug(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// disable auto-ack. If enable auto-ack, RabbitMQ delivers a
// message to
// the customer it immediately removes it from memory.
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
RabbitMessageConsumer.this.consumeMessage(delivery);
}
catch (Exception e) {
// the exception shouldn't affect the next message
logger.info("[IGNORE]" + e.getMessage());
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (Exception e) {
logger.warn(e);
}
if (autoReconnect) {
this.releaseConn(connection);
logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
+ this.reconnectInterval / 1000 + " seconds.");
Thread.sleep(this.getReconnectInterval());
}
else
break;
}
}
private void releaseConn(Connection conn) {
try {
if (conn != null)
conn.close();
}
catch (Exception e) {
// simply ignore this exception
}
}
As it is a 'topic' exchange, no queue is declared at PUBLISHER. However at step#3 of 1st test, the durable queue has been declared, and the message is durable as well. I don't understand why message will be lost before reconnect.
Oh, I found the cause...The message and queue are certainly durable, however the exchange isn't durable. As exchange isn't durable, the binding information between queue and exchange will be lost between RabbitMQ broker restart.
Now I declare the exchange as durable, consumer can get message which published before consumer restart and after broker restart.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With