Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Persist headers when redelivering a RabbitMq message using MassTransit

Purpose: I need to keep track of headers when I redeliver a message.

Configuration:

  • RabbitMQ 3.7.9
  • Erlang 21.2
  • MassTransit 5.1.5
  • MySql 8.0 for the Quartz database

What I've tried without success:

first attempt:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
   if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
   {
      sendCtx.Headers.Set("SenderApp", sender);
   }
}).ConfigureAwait(false);

second attempt:

protected Task ScheduleSend(Uri rabbitUri, double delay)
{
  return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
    rabbitUri,
    TimeSpan.FromSeconds(delay),
    _Data,
    new HeaderPipe(_SenderApp, 0));
}

public class HeaderPipe : IPipe<SendContext>
{
  private readonly byte   _Priority;
  private readonly string _SenderApp;

  public HeaderPipe (byte priority)
  {
    _Priority  = priority;
    _SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
  }

  public HeaderPipe (string senderApp, byte priority)
  {
    _Priority  = priority;
    _SenderApp = senderApp;
  }

  public void Probe (ProbeContext context)
  { }

  public Task Send (SendContext context)
  {
    context.Headers.Set("SenderApp", _SenderApp);
    context.SetPriority(_Priority);
    return Task.CompletedTask;
  }
}

Expected: FinQuest.Robot.DBProcess

Result: null

I log in Consume method my SenderApp. The first time it's look like this

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)

and looks like this after the redelivery

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)

What I'm doing wrong ? I don't want to use the Retry feature due to its maximum number of retry (I don't want to be limited).

Thanks in advance.

like image 492
Karine Avatar asked Oct 25 '25 06:10

Karine


1 Answers

There is a method, used by the redelivery filter, that you might want to use:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90

public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)

In your code, you would use it:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
    sendCtx.TransferConsumeContextHeaders(consumeCtx);
});
like image 164
Chris Patterson Avatar answered Oct 26 '25 19:10

Chris Patterson



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!