Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an easy way to subscribe to the default error queue in EasyNetQ?

In my test application I can see messages that were processed with an exception being automatically inserted into the default EasyNetQ_Default_Error_Queue, which is great. I can then successfully dump or requeue these messages using the Hosepipe, which also works fine, but requires dropping down to the command line and calling against both Hosepipe and the RabbitMQ API to purge the queue of retried messages.

So I'm thinking the easiest approach for my application is to simply subscribe to the error queue, so I can re-process them using the same infrastructure. But in EastNetQ, the error queue seems to be special. We need to subscribe using a proper type and routing ID, so I'm not sure what these values should be for the error queue:

bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);

Can I use the simple API to subscribe to the error queue, or do I need to dig into the advanced API?

If the type of my original message was TestMessage, then I'd like to be able to do something like this:

bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);

where ErrorMessage is a class provided by EasyNetQ to wrap all errors. Is this possible?

like image 548
Mike Chamberlain Avatar asked Feb 25 '13 02:02

Mike Chamberlain


1 Answers

You can't use the simple API to subscribe to the error queue because it doesn't follow EasyNetQ queue type naming conventions - maybe that's something that should be fixed ;)

But the Advanced API works fine. You won't get the original message back, but it's easy to get the JSON representation which you could de-serialize yourself quite easily (using Newtonsoft.JSON). Here's an example of what your subscription code should look like:

[Test]
[Explicit("Requires a RabbitMQ server on localhost")]
public void Should_be_able_to_subscribe_to_error_messages()
{
    var errorQueueName = new Conventions().ErrorQueueNamingConvention();
    var queue = Queue.DeclareDurable(errorQueueName);
    var autoResetEvent = new AutoResetEvent(false);

    bus.Advanced.Subscribe<SystemMessages.Error>(queue, (message, info) =>
    {
        var error = message.Body;

        Console.Out.WriteLine("error.DateTime = {0}", error.DateTime);
        Console.Out.WriteLine("error.Exception = {0}", error.Exception);
        Console.Out.WriteLine("error.Message = {0}", error.Message);
        Console.Out.WriteLine("error.RoutingKey = {0}", error.RoutingKey);

        autoResetEvent.Set();
        return Task.Factory.StartNew(() => { });
    });

    autoResetEvent.WaitOne(1000);
}

I had to fix a small bug in the error message writing code in EasyNetQ before this worked, so please get a version >= 0.9.2.73 before trying it out. You can see the code example here

like image 55
Mike Hadlow Avatar answered Nov 06 '22 16:11

Mike Hadlow