Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume one message?

Tags:

java

rabbitmq

With example in rabbitmq, consumer get all messages from queue at one time. How to consume one message and exit?

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(" [x] Received '" + message + "'");
}
like image 351
jBee Avatar asked Apr 24 '15 07:04

jBee


People also ask

How do you consume a message on RabbitMQ?

In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.

Who is the consumer of the message?

Interface MessageConsumer. A client uses a MessageConsumer object to receive messages from a destination. A MessageConsumer object is created by passing a Destination object to a message-consumer creation method supplied by a session. MessageConsumer is the parent interface for all message consumers.

Is RabbitMQ exactly once?

In RabbitMQ, exactly-once delivery is not supported due to the combination of complex routing and the push-based delivery. Generally, it's recommended to use at-least-once delivery with idempotent consumers.

Does RabbitMQ push or pull?

RabbitMQ uses a push model and prevents overwhelming consumers via the consumer configured prefetch limit. This is great for low latency messaging and works well for RabbitMQ's queue based architecture. Kafka on the other hand uses a pull model where consumers request batches of messages from a given offset.


2 Answers

You have to declare basicQos setting to get one message at a time from ACK to NACK status and disable auto ACK to give acknowledgement explicitly.

ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println("[*] waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, consumer);
    while(true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        int n = channel.queueDeclarePassive(QUEUE_NAME).getMessageCount();
        System.out.println(n);
        if(delivery != null) {
            byte[] bs = delivery.getBody();
            System.out.println(new String(bs));
            //String message= new String(delivery.getBody());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //System.out.println("[x] Received '"+message);
        }
    }

Hope it helps!

like image 106
prathap Avatar answered Oct 14 '22 05:10

prathap


Use AMQP 0.9.1 basic.get to synchronously get just one message.

ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

GetResponse response = channel.basicGet(QUEUE_NAME, true);
if (response != null) {
    String message = new String(response.getBody(), "UTF-8");
}

channel.close();
connection.close();
like image 34
Somu Avatar answered Oct 14 '22 03:10

Somu