With example in rabbitmq, consumer get all messages from queue at one time. How to consume one message and exit?
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
Interface MessageConsumer. A client uses a MessageConsumer object to receive messages from a destination. A MessageConsumer object is created by passing a Destination object to a message-consumer creation method supplied by a session. MessageConsumer is the parent interface for all message consumers.
In RabbitMQ, exactly-once delivery is not supported due to the combination of complex routing and the push-based delivery. Generally, it's recommended to use at-least-once delivery with idempotent consumers.
RabbitMQ uses a push model and prevents overwhelming consumers via the consumer configured prefetch limit. This is great for low latency messaging and works well for RabbitMQ's queue based architecture. Kafka on the other hand uses a pull model where consumers request batches of messages from a given offset.
You have to declare basicQos setting to get one message at a time from ACK to NACK status and disable auto ACK to give acknowledgement explicitly.
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, consumer);
while(true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
int n = channel.queueDeclarePassive(QUEUE_NAME).getMessageCount();
System.out.println(n);
if(delivery != null) {
byte[] bs = delivery.getBody();
System.out.println(new String(bs));
//String message= new String(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//System.out.println("[x] Received '"+message);
}
}
Hope it helps!
Use AMQP 0.9.1 basic.get to synchronously get just one message.
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
GetResponse response = channel.basicGet(QUEUE_NAME, true);
if (response != null) {
String message = new String(response.getBody(), "UTF-8");
}
channel.close();
connection.close();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With