Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I attach a header to all published messages?

Tags:

masstransit

I have a particular header that I'd like to attach to all messages that I publish. I can attach this header on a case-by-case basis by specifying it in the Publish call:

_bus.Publish(myMessage, context => context.SetHeader("my key", "my value"));

This works fine but it becomes a chore to maintain this SetHeader call for every publish. Is there a way, during bus configuration or anywhere else, to specify a header that will be attached to all messages? That is, is there a way to do something like the following?

ServiceBusFactory.New(sbc =>
{        
    sbc.UseRabbitMq();
    sbc.ReceiveFrom(hdoQueue);

    // This is what I'd like to be able to do:
    sbc.BeforePublish(context => context.SetHeader("my key", "my value"));
});

I believe there is a solution that involves implementing IOutboundMessageInterceptor but I can't find a way to attach my interceptor. There is a ServiceBusConfigurator.AddInboundInterceptor method but not a ServiceBusConfigurator.AddOutboundInterceptor method.

like image 815
ean5533 Avatar asked Apr 03 '14 21:04

ean5533


1 Answers

My intuition was correct, I was able to do what I wanted by implementing IOutboundMessageInterceptor:

public class AttachHeadersOutboundInterceptor : IOutboundMessageInterceptor
{
    public void PreDispatch(ISendContext context)
    {
        context.SetHeader("my key", "my value");
    }

    public void PostDispatch(ISendContext context)
    {
    }
}

Oddly there is no ServiceBusConfigurator.AddOutboundInterceptor method, so I just created one (by copying the code for AddInboundInterceptor from github):

public static class MassTransitExtensions
{
    public static void AddOutboundInterceptor(this ServiceBusConfigurator configurator,
        IOutboundMessageInterceptor interceptor)
    {
        var builderConfigurator = new PostCreateBusBuilderConfigurator(bus =>
        {
            var interceptorConfigurator = new OutboundMessageInterceptorConfigurator(bus.OutboundPipeline);

            interceptorConfigurator.Create(interceptor);
        });

        configurator.AddBusConfigurator(builderConfigurator);
    }
}

And then I attach it during bus configuration:

ServiceBusFactory.New(sbc =>
{        
    sbc.UseRabbitMq();
    sbc.ReceiveFrom(hdoQueue);

    sbc.AddOutboundInterceptor(new AttachHeadersOutboundInterceptor());
});

Problem solved.

like image 93
ean5533 Avatar answered Nov 04 '22 18:11

ean5533