Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to log failed message in masstransit?

I'm looking for a good solution to log failed message, right after retry limit is exceeded, without having a deal with error queue. What I've found so far:

  • I can inherit from InMemoryInboundMessageTracker and override IsRetryLimitExceeded, but at this point there no information about message itself except id.
  • I can implement IInboundMessageInterceptor and get IConsumeContext in Pre/PostDispatch, but at this point there no information about success/fail.

So as a solution, I can get IConsumeContext in PreDispatch put it in some sort of a cache then get it out of a cache in IsRetryLimitExceeded when retry limit is exceeded.

Methods are called in such order: IsRetryLimitExceeded -> PreDispatch -> PostDispatch

So I can't find a good place to remove successfully processed message from a cache.

Of course I can use a cache with restricted size but this whole solution seems to be weird.

Any thoughts on this matter would be appreciated.

like image 951
amstix Avatar asked Nov 21 '13 10:11

amstix


2 Answers

I've ednded up with this solution:

class MessageInterceptor: IInboundMessageInterceptor
{
    public void PreDispatch(IConsumeContext context)
    {
        MessageTracker.Register(context);
    }

    public void PostDispatch(IConsumeContext context)
    {}
}

class MessageTracker: InMemoryInboundMessageTracker
{
    readonly Logger logger;

    static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>();

    public MessageTracker(int retryLimit, Logger logger)
        : base(retryLimit)
    {
        this.logger = logger;
    }

    public static void Register(IConsumeContext context)
    {
        DispatchingCache.GetOrAdd(context.MessageId, context);
    }

    public override void MessageWasReceivedSuccessfully(string id)
    {
        base.MessageWasReceivedSuccessfully(id);

        IConsumeContext value;
        DispatchingCache.TryRemove(id, out value);
    }

    public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions)
    {
        var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions);

        IConsumeContext failed;
        if (!result || !DispatchingCache.TryRemove(id, out failed))
            return result;

        // --> log failed IConsumeContext with exception 

        return true;
    }
}

And to plug those classes in

        serviceBus = ServiceBusFactory.New(config =>
        {
            ...
            config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb =>
            {
                var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline);
                interceptorConfig.Create(new MessageInterceptor());
            }));

            config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger()));
        });
like image 70
amstix Avatar answered Oct 04 '22 21:10

amstix


You can implement and configure your own Message Retry Tracking on the bus, so that messages that are failed are passed through your implementation. You can delegate to the default retry tracker and just intercept the events so that you can act on them, or you can implement your own retry tracking if needed.

MessageTrackerFactory is the delegate for configuring, I think the interface is nearby.

like image 21
Chris Patterson Avatar answered Oct 04 '22 20:10

Chris Patterson