Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ lose message before consumer reconnect

Tags:

rabbitmq

amqp

I implemented a consumer, which will reconnect to broker automatically after a while if underlying connection is closed. My case is as below:

  1. Launch RabbitMQ server successfully.
  2. Launch consumer successfully.
  3. Published a message, and consumer received it successfully.
  4. Stop RabbitMQ server, consumer will show an exception:

    com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}.

    And then consumer will sleep 60 seconds before reconnect.

  5. Launch RabbitMQ server again.
  6. Publish a message successfully, the result of command 'list_queues' is 0
  7. After 60 seconds, consumer connect to RabbitMQ again, however now messages received which are published at step#6.
  8. Publish the 3rd message, consumer received it successfully.

In this case, all messages published before reconnect will be lost. Also I performed another experiment.

  1. Launch RabbitMQ, and publish a message successfully(no consumer process launched).
  2. Stop RabbitMQ, then restart it.
  3. Launch consumer process, receive the message published at step#1 successfully.

Note:The QOS of consumer is 1. I have researched RabbitMQ several days, in my understanding, consumer should get the message published before reconnect. Pls help(I ran test based on windows rabbitMQ).

Below is the PUBLISHER:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();               
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();

And the CONSUMER is as below:

    @Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
        final int qos) throws IOException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(this.getHost());

    while (true) {
        Connection connection = null;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(exchangeName, "topic");
            // declare a durable, non-exclusive, non-autodelete queue.
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            // distribute workload among all consumers, consumer will
            // pre-fetch
            // {qos}
            // messages to local buffer.
            channel.basicQos(qos);

            logger.debug(" [*] Waiting for messages. To exit press CTRL+C");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            // disable auto-ack. If enable auto-ack, RabbitMQ delivers a
            // message to
            // the customer it immediately removes it from memory.
            boolean autoAck = false;
            channel.basicConsume(queueName, autoAck, consumer);

            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                try {
                    RabbitMessageConsumer.this.consumeMessage(delivery);
                }
                catch (Exception e) {
                    // the exception shouldn't affect the next message
                    logger.info("[IGNORE]" + e.getMessage());
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
        catch (Exception e) {
            logger.warn(e);
        }

        if (autoReconnect) {
            this.releaseConn(connection);
            logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
                    + this.reconnectInterval / 1000 + " seconds.");
            Thread.sleep(this.getReconnectInterval());
        }
        else
            break;
    }
}

private void releaseConn(Connection conn) {
    try {
        if (conn != null)
            conn.close();
    }
    catch (Exception e) {
        // simply ignore this exception
    }
}

As it is a 'topic' exchange, no queue is declared at PUBLISHER. However at step#3 of 1st test, the durable queue has been declared, and the message is durable as well. I don't understand why message will be lost before reconnect.

like image 226
Ramon Avatar asked Dec 03 '22 22:12

Ramon


1 Answers

Oh, I found the cause...The message and queue are certainly durable, however the exchange isn't durable. As exchange isn't durable, the binding information between queue and exchange will be lost between RabbitMQ broker restart.

Now I declare the exchange as durable, consumer can get message which published before consumer restart and after broker restart.

like image 127
Ramon Avatar answered Dec 25 '22 23:12

Ramon