Just spent my first few hours looking at Redis and Redis MQ.
Slowly getting the hang of Redis and was wondering how you could resend a message that is in a dead letter queue?
Also, where are the configuration options which determine how many times a message is retried before it goes into the dead letter queue?
Redis Lists and Redis Sorted Sets are the basis for implementing message queues. They can be used both directly to build bespoke solutions, or via a framework that makes message processing more idiomatic for your programming language of choice.
To tell the queue manager about the dead-letter queue, specify a dead-letter queue name on the crtmqm command ( crtmqm -u DEAD. LETTER. QUEUE , for example), or by using the DEADQ attribute on the ALTER QMGR command to specify one later. You must define the dead-letter queue before using it.
To process messages on a dead-letter queue (DLQ), MQ supplies a default DLQ handler. The handler matches messages on the DLQ against entries in a rules table that you define. Messages can be put on a DLQ by queue managers, message channel agents (MCAs), and applications.
Currently, there's no way to automatically resend messages in the dead letter queue in ServiceStack. However, you can do this manually relatively easily:
To reload messages from the dead letter queue by using:
public class AppHost {
public override Configure(){
// create the hostMq ...
var hostMq = new RedisMqHost( clients, retryCount = 2 );
// with retryCount = 2, 3 total attempts are made. 1st + 2 retries
// before starting hostMq
RecoverDLQMessages<TheMessage>(hostMq);
// add handlers
hostMq.RegisterHandler<TheMessage>( m =>
this.ServiceController.ExecuteMessage( m ) );
// start hostMq
hostMq.Start();
}
}
Which ultimately uses the following to recover (requeue) messages:
private void RecoverDLQMessages<T>( RedisMqHost hostMq )
{
var client = hostMq.CreateMessageQueueClient();
var errorQueue = QueueNames<T>.Dlq;
log.Info( "Recovering Dead Messages from: {0}", errorQueue );
var recovered = 0;
byte[] msgBytes;
while( (msgBytes = client.Get( errorQueue, TimeSpan.FromSeconds(1) )) != null )
{
var msg = msgBytes.ToMessage<T>();
msg.RetryAttempts = 0;
client.Publish( msg );
recovered++;
}
log.Info( "Recovered {0} from {1}", recovered, errorQueue );
}
At the time of this writing, there's a possibility of ServiceStack losing messages. Please See Issue 229 Here, so, don't kill the process while it's moving messages from the DLQ (dead letter queue) back to the input queue. Under the hood, ServiceStack is POPing messages from Redis.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With