Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Middleware with Masstransit publish

I have .net core WEB API application with MassTransit (for implement RabbitMQ message broker). RabbitMQ-MassTransit configuration is simple and done in few line code in Startup.cs file.

services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomLogConsume>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ExchangeType = ExchangeType.Fanout;

                cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
                {
                    e.PrefetchCount = 16;
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));
        });

I am using dependency injection in my project solution for better code standard. Publish messages are works fine with controller dependency injection. But when I implement a custom middle ware for log actions, Masstransit failed to publish the message properly, it was created a additional queue with _error in RabbitMQ web console.

public class RequestResponseLoggingMiddleware
{
    #region Private Variables

    /// <summary>
    /// RequestDelegate
    /// </summary>
    private readonly RequestDelegate _next;

    /// <summary>
    /// IActionLogPublish
    /// </summary>
    private readonly IActionLogPublish _logPublish;

    #endregion

    #region Constructor
    public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
    {
        _next = next;
        _logPublish = logPublish;
    }
    #endregion

    #region PrivateMethods

    #region FormatRequest
    /// <summary>
    /// FormatRequest
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    private async Task<ActionLog> FormatRequest(HttpRequest request)
    {
        ActionLog actionLog = new ActionLog();
        var body = request.Body;
        request.EnableRewind();

        var context = request.HttpContext;

        var buffer = new byte[Convert.ToInt32(request.ContentLength)];
        await request.Body.ReadAsync(buffer, 0, buffer.Length);
        var bodyAsText = Encoding.UTF8.GetString(buffer);
        request.Body = body;

        var injectedRequestStream = new MemoryStream();

        var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";

        using (var bodyReader = new StreamReader(context.Request.Body))
        {
            bodyAsText = bodyReader.ReadToEnd();

            if (string.IsNullOrWhiteSpace(bodyAsText) == false)
            {
                requestLog += $", Body : {bodyAsText}";
            }

            var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
            injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
            injectedRequestStream.Seek(0, SeekOrigin.Begin);
            context.Request.Body = injectedRequestStream;
        }

        actionLog.Request = $"{bodyAsText}";
        actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";

        return actionLog;
    }
    #endregion

    #region FormatResponse
    private async Task<string> FormatResponse(HttpResponse response)
    {
        response.Body.Seek(0, SeekOrigin.Begin);
        var text = await new StreamReader(response.Body).ReadToEndAsync();
        response.Body.Seek(0, SeekOrigin.Begin);

        return $"Response {text}";
    }
    #endregion

    #endregion

    #region PublicMethods

    #region Invoke
    /// <summary>
    /// Invoke - Hits before executing any action. Actions call executes from _next(context)
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async Task Invoke(HttpContext context)
    {
        ActionLog actionLog = new ActionLog();

        actionLog = await FormatRequest(context.Request);


        var originalBodyStream = context.Response.Body;

        using (var responseBody = new MemoryStream())
        {
            context.Response.Body = responseBody;

            await _next(context);

            actionLog.Response = await FormatResponse(context.Response);

            await _logPublish.Publish(actionLog);
            await responseBody.CopyToAsync(originalBodyStream);
        }
    }
    #endregion

    #endregion
}

configure Middleware in startup

  public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ............
        app.UseMiddleware<RequestResponseLoggingMiddleware>();
        ....................
    }

Is there any additional configuration in startup for MassTransit to work with Middle Ware

Edit

IActionLogPublish

public interface IActionLogPublish
{
    Task Publish(ActionLog model);
}

ActionLogPublish

public class ActionLogPublish : IActionLogPublish
{

    private readonly IBus _bus;

    public ActionLogPublish(IBus bus)
    {
        _bus = bus;
    }

    public async Task Publish(ActionLog actionLogData)
    {
        /* Publish values to RabbitMQ Service Bus */

        await _bus.Publish(actionLogData);

        /* Publish values to RabbitMQ Service Bus */
    }

}

Edit

RabbitMQ Web Console

enter image description here

like image 761
Ragesh S Avatar asked Oct 09 '19 06:10

Ragesh S


2 Answers

The middleware needs to put the original body back in the response.

Also the injected dependency works fine with controllers and not middleware as it may be registered with scoped lifetime.

In that case it should not be constructor injected into the middlewre but directly into the Invoke

Because middleware is constructed at app startup, not per-request, scoped lifetime services used by middleware constructors aren't shared with other dependency-injected types during each request. If you must share a scoped service between your middleware and other types, add these services to the Invoke method's signature. The Invoke method can accept additional parameters that are populated by DI:

//...omitted for brevity

public RequestResponseLoggingMiddleware(RequestDelegate next) {
    _next = next;
}

//...

private async Task<string> FormatResponseStream(Stream stream) {
    stream.Seek(0, SeekOrigin.Begin);
    var text = await new StreamReader(stream).ReadToEndAsync();
    stream.Seek(0, SeekOrigin.Begin);
    return $"Response {text}";
}

public async Task Invoke(HttpContext context, IActionLogPublish logger) {
    ActionLog actionLog = await FormatRequest(context.Request);
    //keep local copy of response stream
    var originalBodyStream = context.Response.Body;

    using (var responseBody = new MemoryStream()) {
        //replace stream for down stream calls
        context.Response.Body = responseBody;

        await _next(context);

        //put original stream back in the response object
        context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT

        //Copy local stream to original stream
        responseBody.Position = 0;
        await responseBody.CopyToAsync(originalBodyStream);

        //custom logging
        actionLog.Response = await FormatResponse(responseBody);
        await logger.Publish(actionLog);
    }
}

Reference Dependency injection in ASP.NET Core: Scoped Service lifetime

When using a scoped service in a middleware, inject the service into the Invoke or InvokeAsync method. Don't inject via constructor injection because it forces the service to behave like a singleton. For more information, see Write custom ASP.NET Core middleware.

Emphasis mine

like image 127
Nkosi Avatar answered Oct 09 '22 01:10

Nkosi


It is hard to tell from the description what error you are getting exactly. The middleware implementation looks complicated and it can be a source of the error. I would guess that you don't set stream position correctly or something. Corrections from @Nkosi may actually fix it.

If you say that IBus works correctly from controllers, which are created per request, you may want to try to implement IMiddleware interface in your middleware as described in this doc.

public class RequestResponseLoggingMiddleware : IMiddleware
{
    IActionLogPublish logPublish;

    public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
    {
        this.logPublish = logPublish;
    }

    // ...

    public async Task InvokeAsync(HttpContext context, RequestDelegate next)
    {
        //...
    }

    //...
}

In this case middleware will be registered as scoped or transient service and resolved for every request, same as controller. Which may also fix your issue if it relates to scoped services resolution.

like image 43
Andrii Litvinov Avatar answered Oct 09 '22 00:10

Andrii Litvinov