Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ. Java client. Is it possible to acknowledge message not on the same thread it was received?

I want to fetch several messages, handle them and ack them all together after that. So basically I receive a message, put it in some queue and continue receiving messages from rabbit. Different thread will monitor this queue with received messages and process them when amount is sufficient. All I've been able to found about ack contains examples only for one message which processed on the same thread. Like this(from official docs):

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

And also documentation says this:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire.

So I'm confused here. If I'm acking some message and at the same time the channel is receiving another message from rabbit, is it considered to be two operations at the time? It seems to me like yes.

I've tried to acknowledge message on the same channel from different thread and it seems to work, but documentation says that I should not share channels between threads. So I've tried to do acknowledgment on different thread with different channel, but it fails, because delivery tag is unknown for this channel.

Is it possible to acknowledge message not on the same thread it was received?

UPD Example piece of code of what I want. It's in scala, but I think it's straightforward.

 case class AmqpMessage(envelope: Envelope, msgBody: String)

    val queue = new ArrayBlockingQueue[AmqpMessage](100)

    val consumeChannel = connection.createChannel()
    consumeChannel.queueDeclare(queueName, true, false, true, null)
    consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
      override def handleDelivery(consumerTag: String,
                                  envelope: Envelope,
                                  properties: BasicProperties,
                                  body: Array[Byte]): Unit = {
        queue.put(new AmqpMessage(envelope, new String(body)))
      }
    })

    Future {
      // this is different thread
      val channel = connection.createChannel()
      while (true) {
        try {
          val amqpMessage = queue.take()
          channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
          consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
        } catch {
          case e: Exception => e.printStackTrace()
        }
      }
    }
like image 441
Artem Malinko Avatar asked Nov 08 '22 13:11

Artem Malinko


1 Answers

Although the documentation is pretty restrictive, some operations on channels are safe to invoke concurrently. You may ACK messages in the different thread as long as consuming and acking are the only actions you do on the channel.

See this SO question, which deals with the same thing:

RabbitMQ and channels Java thread safety

like image 194
David Siro Avatar answered Nov 14 '22 22:11

David Siro