I'm looking for a good solution to log failed message, right after retry limit is exceeded, without having a deal with error queue. What I've found so far:
So as a solution, I can get IConsumeContext in PreDispatch put it in some sort of a cache then get it out of a cache in IsRetryLimitExceeded when retry limit is exceeded.
Methods are called in such order: IsRetryLimitExceeded -> PreDispatch -> PostDispatch
So I can't find a good place to remove successfully processed message from a cache.
Of course I can use a cache with restricted size but this whole solution seems to be weird.
Any thoughts on this matter would be appreciated.
I've ednded up with this solution:
class MessageInterceptor: IInboundMessageInterceptor
{
public void PreDispatch(IConsumeContext context)
{
MessageTracker.Register(context);
}
public void PostDispatch(IConsumeContext context)
{}
}
class MessageTracker: InMemoryInboundMessageTracker
{
readonly Logger logger;
static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>();
public MessageTracker(int retryLimit, Logger logger)
: base(retryLimit)
{
this.logger = logger;
}
public static void Register(IConsumeContext context)
{
DispatchingCache.GetOrAdd(context.MessageId, context);
}
public override void MessageWasReceivedSuccessfully(string id)
{
base.MessageWasReceivedSuccessfully(id);
IConsumeContext value;
DispatchingCache.TryRemove(id, out value);
}
public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions)
{
var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions);
IConsumeContext failed;
if (!result || !DispatchingCache.TryRemove(id, out failed))
return result;
// --> log failed IConsumeContext with exception
return true;
}
}
And to plug those classes in
serviceBus = ServiceBusFactory.New(config =>
{
...
config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb =>
{
var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline);
interceptorConfig.Create(new MessageInterceptor());
}));
config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger()));
});
You can implement and configure your own Message Retry Tracking on the bus, so that messages that are failed are passed through your implementation. You can delegate to the default retry tracker and just intercept the events so that you can act on them, or you can implement your own retry tracking if needed.
MessageTrackerFactory is the delegate for configuring, I think the interface is nearby.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With