I have .net core WEB API application with MassTransit (for implement RabbitMQ message broker). RabbitMQ-MassTransit configuration is simple and done in few line code in Startup.cs
file.
services.AddMassTransit(x =>
{
x.AddConsumer<CustomLogConsume>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ExchangeType = ExchangeType.Fanout;
cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
{
e.PrefetchCount = 16;
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
});
I am using dependency injection in my project solution for better code standard. Publish messages are works fine with controller dependency injection. But when I implement a custom middle ware for log actions, Masstransit failed to publish the message properly, it was created a additional queue with _error
in RabbitMQ web console.
public class RequestResponseLoggingMiddleware
{
#region Private Variables
/// <summary>
/// RequestDelegate
/// </summary>
private readonly RequestDelegate _next;
/// <summary>
/// IActionLogPublish
/// </summary>
private readonly IActionLogPublish _logPublish;
#endregion
#region Constructor
public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
{
_next = next;
_logPublish = logPublish;
}
#endregion
#region PrivateMethods
#region FormatRequest
/// <summary>
/// FormatRequest
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
private async Task<ActionLog> FormatRequest(HttpRequest request)
{
ActionLog actionLog = new ActionLog();
var body = request.Body;
request.EnableRewind();
var context = request.HttpContext;
var buffer = new byte[Convert.ToInt32(request.ContentLength)];
await request.Body.ReadAsync(buffer, 0, buffer.Length);
var bodyAsText = Encoding.UTF8.GetString(buffer);
request.Body = body;
var injectedRequestStream = new MemoryStream();
var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";
using (var bodyReader = new StreamReader(context.Request.Body))
{
bodyAsText = bodyReader.ReadToEnd();
if (string.IsNullOrWhiteSpace(bodyAsText) == false)
{
requestLog += $", Body : {bodyAsText}";
}
var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
injectedRequestStream.Seek(0, SeekOrigin.Begin);
context.Request.Body = injectedRequestStream;
}
actionLog.Request = $"{bodyAsText}";
actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";
return actionLog;
}
#endregion
#region FormatResponse
private async Task<string> FormatResponse(HttpResponse response)
{
response.Body.Seek(0, SeekOrigin.Begin);
var text = await new StreamReader(response.Body).ReadToEndAsync();
response.Body.Seek(0, SeekOrigin.Begin);
return $"Response {text}";
}
#endregion
#endregion
#region PublicMethods
#region Invoke
/// <summary>
/// Invoke - Hits before executing any action. Actions call executes from _next(context)
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public async Task Invoke(HttpContext context)
{
ActionLog actionLog = new ActionLog();
actionLog = await FormatRequest(context.Request);
var originalBodyStream = context.Response.Body;
using (var responseBody = new MemoryStream())
{
context.Response.Body = responseBody;
await _next(context);
actionLog.Response = await FormatResponse(context.Response);
await _logPublish.Publish(actionLog);
await responseBody.CopyToAsync(originalBodyStream);
}
}
#endregion
#endregion
}
configure Middleware in startup
public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{
............
app.UseMiddleware<RequestResponseLoggingMiddleware>();
....................
}
Is there any additional configuration in startup for MassTransit to work with Middle Ware
Edit
IActionLogPublish
public interface IActionLogPublish
{
Task Publish(ActionLog model);
}
ActionLogPublish
public class ActionLogPublish : IActionLogPublish
{
private readonly IBus _bus;
public ActionLogPublish(IBus bus)
{
_bus = bus;
}
public async Task Publish(ActionLog actionLogData)
{
/* Publish values to RabbitMQ Service Bus */
await _bus.Publish(actionLogData);
/* Publish values to RabbitMQ Service Bus */
}
}
Edit
RabbitMQ Web Console
The middleware needs to put the original body back in the response.
Also the injected dependency works fine with controllers and not middleware as it may be registered with scoped lifetime.
In that case it should not be constructor injected into the middlewre but directly into the Invoke
Because middleware is constructed at app startup, not per-request, scoped lifetime services used by middleware constructors aren't shared with other dependency-injected types during each request. If you must share a scoped service between your middleware and other types, add these services to the
Invoke
method's signature. TheInvoke
method can accept additional parameters that are populated by DI:
//...omitted for brevity
public RequestResponseLoggingMiddleware(RequestDelegate next) {
_next = next;
}
//...
private async Task<string> FormatResponseStream(Stream stream) {
stream.Seek(0, SeekOrigin.Begin);
var text = await new StreamReader(stream).ReadToEndAsync();
stream.Seek(0, SeekOrigin.Begin);
return $"Response {text}";
}
public async Task Invoke(HttpContext context, IActionLogPublish logger) {
ActionLog actionLog = await FormatRequest(context.Request);
//keep local copy of response stream
var originalBodyStream = context.Response.Body;
using (var responseBody = new MemoryStream()) {
//replace stream for down stream calls
context.Response.Body = responseBody;
await _next(context);
//put original stream back in the response object
context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT
//Copy local stream to original stream
responseBody.Position = 0;
await responseBody.CopyToAsync(originalBodyStream);
//custom logging
actionLog.Response = await FormatResponse(responseBody);
await logger.Publish(actionLog);
}
}
When using a scoped service in a middleware, inject the service into the
Invoke
orInvokeAsync
method. Don't inject via constructor injection because it forces the service to behave like a singleton. For more information, see Write custom ASP.NET Core middleware.
Emphasis mine
It is hard to tell from the description what error you are getting exactly. The middleware implementation looks complicated and it can be a source of the error. I would guess that you don't set stream position correctly or something. Corrections from @Nkosi may actually fix it.
If you say that IBus
works correctly from controllers, which are created per request, you may want to try to implement IMiddleware
interface in your middleware as described in this doc.
public class RequestResponseLoggingMiddleware : IMiddleware
{
IActionLogPublish logPublish;
public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
{
this.logPublish = logPublish;
}
// ...
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
//...
}
//...
}
In this case middleware will be registered as scoped or transient service and resolved for every request, same as controller. Which may also fix your issue if it relates to scoped services resolution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With