I was able to consume multiple messages that are sent by multiple producers to the same exchange with different routing key using the above code and was able to insert each message to database.
But this will consume too much of resources as messages will be inserted into DB one after the other. So I decided to go for batch insert and I found I can set BasicQos
After setting the message limit to 10 in BasicQos, my expectation is the Console.WriteLine
must write 10 messages, but it is not as expected.
My expectation is to consume N number messages from the queue and do bulk insert and on successful send ACK else No ACK
Here is the piece of code I use.
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// Insert into Database
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" Recevier Ack " + ea.DeliveryTag);
}
catch (Exception e)
{
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine(" Recevier No Ack " + ea.DeliveryTag);
}
};
Console.ReadLine();
}
}
To increase the performance and to consume more messages at a time, do as follows: Open the "RabbitMQ connection" and go to the Event sources tab. In Advanced Settings > "Other Attributes:", add “concurrentConsumers” property. For instance: concurrentConsumers=10.
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.
If you want multiple consumers to the same message, do the following procedure. Create multiple queues, one for each app that is to receive the message, in each queue properties, "bind" a routing tag with the amq. direct exchange. Change you publishing app to send to amq.
BasicQos = 10
means that the client fetch only 10 messages at time, but when you consume it you will see always one message a time.
Read here: https://www.rabbitmq.com/consumer-prefetch.html
AMQP specifies the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").
for your scope you have to download the messages, put it inside a temporary list and then insert into the DB.
an then you can use:
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
void basicAck()
Parameters: deliveryTag - the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
multiple - true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
Example
final List<String> myMessagges = new ArrayList<String>();
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
myMessagges.add(new String(body));
System.out.println("Received...");
if (myMessagges.size() >= 10) {
System.out.println("insert into DB...");
channel.basicAck(envelope.getDeliveryTag(), true);
myMessagges.clear();
}
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With