Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMq - ConversationId vs CorrelationId - Which is the more appropriate for tracking a specific request?

RabbitMQ seems to have two properties that are very similar, and I don't entirely understand the difference. ConversationId and CorrelationId.

My use case is as follows. I have a website that generates a Guid. The website calls an API, adding that unique identifier to the HttpRequest headers. This in turn publishes a message to RabbitMQ. That message is processed by the first consumer and passed off elsewhere to another consumer, and so on.

For logging purposes I want to log an identifier that ties the initial request together with all of the subsequent actions. This should be unique for that journey throughout the different parts of the application. Hence. When logged to something like Serilog/ElasticSearch, this then becomes easy to see which request triggered the initial request, and all of the log entries for that request throughout the application can be correlated together.

I have created a provider that looks at the incoming HttpRequest for an identifier. I've called this a "CorrelationId", but I'm starting to wonder if this should really be named a "ConversationId". In terms of RabbitMQ, does the idea of a "ConversationId" fit better to this model, or is "CorrelationId" better?

What is the difference between the two concepts?

In terms of code, I've looked to do the following. Firstly register the bus in my API and configure the SendPublish to use the CorrelationId from the provider.

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

For reference, this is my simple provider interface

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

And the AspNetCore implementation, which extracts the unique ID set by the calling client (i.e. a website).

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

I then have a logger in my consumer that uses that provider to extract the CorrelationId:

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

Reading the docs, it says the following about a "ConversationId":

The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.

Now I'm starting to think that I've got my terminology mixed up, and technically this is a conversation (although the 'conversation' is like 'the telephone game').

So, CorrelationId in this use case, or ConversationId? Please help me get my terminology right!!

like image 400
Rebecca Avatar asked Mar 21 '19 16:03

Rebecca


1 Answers

In a message conversation (cue foreboding musical score), there can be a single message (I told you to do something, or I told everyone who is listening that something happened) or multiple messages (I told you to do something, and you told someone else, or I told everyone who is listening that something happened and those listeners told their friends, and so on, and so on).

Using MassTransit, from the first message to the final message, used properly, every single one of those messages would have the same ConversationId. MassTransit copies the property from ConsumeContext, unmodified, to every outgoing message during the consumption of a message. This makes everything part of the same trace - a conversation.

The CorrelationId, however, is not set by default by MassTransit. It can be automatically set if a message property is named CorrelationId (or CommandId, or EventId), or you can add your own names too.

If the CorrelationId is present on a consumed message, any outgoing messages will have that CorrelationId property copied to the InitiatorId property (cause, and effect -- the consumed message initiated the creation of the subsequent messages). This forms a chain (or span, in trace terminology) that can be followed to show the spread of messages from the initial message.

The CorrelationId should be thought of as the identifier for a command or event, such that the effects of that command can be seen throughout the system logs.

It sounds to me like your input from HTTP might be the Initiator, and thus copy that identifier into InitiatorId and create a new CorrelationId for the message, or you may want to just use the same identifier for the initial CorrelationId and let the subsequent messages use it as the initiator.

like image 105
Chris Patterson Avatar answered Sep 27 '22 20:09

Chris Patterson