I am creating a consumer that runs in an infinite loop to read messages from the queue. I am looking for advice/sample code on how to recover abd continue within my infinite loop even if there are network disruptions. The consumer has to stay running as it will be installed as a WindowsService.
1) Can someone please explain how to properly use these settings? What is the difference between them?
NetworkRecoveryInterval
AutomaticRecoveryEnabled
RequestedHeartbeat
2) Please see my current sample code for the consumer. I am using the .Net RabbitMQ Client v3.5.6.
How will the above settings do the "recovery" for me? e.g. will consumer.Queue.Dequeue block until it is recovered? That doesn't seem right so...
Do I have to code for this manually? e.g. will consumer.Queue.Dequeue throw an exception for which I have to detect and manually re-create my connection, channel, and consumer? Or just the consumer, as "AutomaticRecovery" will recover the channel for me?
Does this mean I should move the consumer creation inside the while loop? what about the channel creation? and the connection creation?
3) Assuming I have to do some of this recovery code manually, are there event callbacks (and how do I register for them) to tell me that there are network problems?
Thanks!
public void StartConsumer(string queue)
{
using (IModel channel = this.Connection.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
const bool noAck = false;
channel.BasicConsume(queue, noAck, consumer);
// do I need these conditions? or should I just do while(true)???
while (channel.IsOpen &&
Connection.IsOpen &&
consumer.IsRunning)
{
try
{
BasicDeliverEventArgs item;
if (consumer.Queue.Dequeue(Timeout, out item))
{
string message = System.Text.Encoding.UTF8.GetString(item.Body);
DoSomethingMethod(message);
channel.BasicAck(item.DeliveryTag, false);
}
}
catch (EndOfStreamException ex)
{
// this is likely due to some connection issue -- what am I to do?
}
catch (Exception ex)
{
// should never happen, but lets say my DoSomethingMethod(message); throws an exception
// presumably, I'll just log the error and keep on going
}
}
}
}
public IConnection Connection
{
get
{
if (_connection == null) // _connection defined in class -- private static IConnection _connection;
{
_connection = CreateConnection();
}
return _connection;
}
}
private IConnection CreateConnection()
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "RabbitMqHostName",
UserName = "RabbitMqUserName",
Password = "RabbitMqPassword",
};
// why do we need to set this explicitly? shouldn't this be the default?
factory.AutomaticRecoveryEnabled = true;
// what is a good value to use?
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5);
// what is a good value to use? How is this different from NetworkRecoveryInterval?
factory.RequestedHeartbeat = 5;
IConnection connection = factory.CreateConnection();
return connection;
}
RabbitMQ uses so-called 'heartbeats' as a keep-alive mechanism with which, in simple terms, the client is expected to send heartbeats to the server informing it that it is still alive and running.
RabbitMQ Durable queues are those that can withstand a RabbitMQ restart. If a queue is not durable, all messages will be lost if RabbitMQ is shut down for any reason.
Topology recovery involves recovery of exchanges, queues, bindings and consumers.
Registering a Consumer (Subscribing, "Push API") Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages.
The documentation on RabbitMQ's site is actually really good. If you want to recover queues, exchanges and consumers, you're looking for topology recovery, which is enabled by default. Automatic Recovery (which is enabled by default) includes:
basic.qos
setting, publisher confirms and transaction settingsThe NetworkRecoveryInterval
is the amount of time before a retry on an automatic recovery is performed (defaults to 5s).
Heartbeat has another purpose, namely to identify dead TCP connections. There are more to read about that at RabbitMQ's site.
Writing reliable code for recovery is tricky. The EndOfStreamException
is (as you suspect) most likely due to network problems. If you use the management plugin, you can reproduce this by closing the connection from there and see that the exception is triggered. For production-like applications, you might want to have a set of brokers that you alternate between in case of connection failure. If you have several RabbitMQ brokers, you might also want to guard yourself against long-term server failure on one or more of the servers. You might want to implement error strategies, like requeuing the message, or using a dead letter exchange.
I've been thinking a bit of these things and written a thin client, RawRabbit, that handles some of these things. Maybe it could be something for you? If not, I would suggest that you change the QueueingBasicConsumer
to an EventingBasicConsumer
. It is event driven, rather than thread blocking.
var eventConsumer = new EventingBasicConsumer(channel);
eventConsumer.Received += (sender, args) =>
{
var body = args.Body;
eventConsumer.Model.BasicAck(args.DeliveryTag, false);
};
channel.BasicConsume(queue, false, eventConsumer);
If you have topology recovery activated, the consumer will be restored by the RabbitMQ Client and start receiving messages again.
For more granular control, hook up event handlers for ConsumerCancelled
and Shutdown
to detect connectivity problems and Registered
to know when the consumer can be used again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With