I've created an ASP.NET Core MVC/WebApi site that has a RabbitMQ subscriber based off James Still's blog article Real-World PubSub Messaging with RabbitMQ.
In his article he uses a static class to start the queue subscriber and define the event handler for queued events. This static method then instantiates the event handler classes via a static factory class.
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace NST.Web.MessageProcessing { public static class MessageListener { private static IConnection _connection; private static IModel _channel; public static void Start(string hostName, string userName, string password, int port) { var factory = new ConnectionFactory { HostName = hostName, Port = port, UserName = userName, Password = password, VirtualHost = "/", AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(15) }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true); var queueName = "myQueue"; QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null); _channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey"); var consumer = new EventingBasicConsumer(_channel); consumer.Received += ConsumerOnReceived; _channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); } public static void Stop() { _channel.Close(200, "Goodbye"); _connection.Close(); } private static void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) { // get the details from the event var body = ea.Body; var message = Encoding.UTF8.GetString(body); var messageType = "endpoint"; // hardcoding the message type while we dev... // instantiate the appropriate handler based on the message type IMessageProcessor processor = MessageHandlerFactory.Create(messageType); processor.Process(message); // Ack the event on the queue IBasicConsumer consumer = (IBasicConsumer)sender; consumer.Model.BasicAck(ea.DeliveryTag, false); } } }
It works great up to the point where I now need to resolve a service in my message processor factory rather than just write to the console.
using NST.Web.Services; using System; namespace NST.Web.MessageProcessing { public static class MessageHandlerFactory { public static IMessageProcessor Create(string messageType) { switch (messageType.ToLower()) { case "ipset": // need to resolve IIpSetService here... IIpSetService ipService = ??????? return new IpSetMessageProcessor(ipService); case "endpoint": // need to resolve IEndpointService here... IEndpointService epService = ??????? // create new message processor return new EndpointMessageProcessor(epService); default: throw new Exception("Unknown message type"); } } } }
Is there any way to access the ASP.NET Core IoC container to resolve the dependencies? I don't really want to have to spin up the whole stack of dependencies by hand :(
Or, is there a better way to subscribe to RabbitMQ from an ASP.NET Core application? I found RestBus but it's not been updated for Core 1.x
The IServiceProvider is responsible for resolving instances of types at runtime, as required by the application. These instances can be injected into other services resolved from the same dependency injection container. The ServiceProvider ensures that resolved services live for the expected lifetime.
The AddScoped method registers the service with a scoped lifetime, the lifetime of a single request. Service lifetimes are described later in this topic. By using the DI pattern, the controller: Doesn't use the concrete type MyDependency , only the IMyDependency interface it implements.
ASP.NET Core contains a built-in dependency injection mechanism. In the Startup. cs file, there is a method called ConfigureServices which registers all application services in the IServiceCollection parameter. The collection is managed by the Microsoft.
You can avoid the static classes and use Dependency Injection all the way through combined with:
IApplicationLifetime
to start/stop the listener whenever the application starts/stops.IServiceProvider
to create instances of the message processors.First thing, let's move the configuration to its own class that can be populated from the appsettings.json:
public class RabbitOptions { public string HostName { get; set; } public string UserName { get; set; } public string Password { get; set; } public int Port { get; set; } } // In appsettings.json: { "Rabbit": { "hostName": "192.168.99.100", "username": "guest", "password": "guest", "port": 5672 } }
Next, convert MessageHandlerFactory
into a non-static class that receives an IServiceProvider
as a dependency. It will use the service provider to resolve the message processor instances:
public class MessageHandlerFactory { private readonly IServiceProvider services; public MessageHandlerFactory(IServiceProvider services) { this.services = services; } public IMessageProcessor Create(string messageType) { switch (messageType.ToLower()) { case "ipset": return services.GetService<IpSetMessageProcessor>(); case "endpoint": return services.GetService<EndpointMessageProcessor>(); default: throw new Exception("Unknown message type"); } } }
This way your message processor classes can receive in the constructor any dependencies they need (as long as you configure them in Startup.ConfigureServices
). For example, I am injecting an ILogger into one of my sample processors:
public class IpSetMessageProcessor : IMessageProcessor { private ILogger<IpSetMessageProcessor> logger; public IpSetMessageProcessor(ILogger<IpSetMessageProcessor> logger) { this.logger = logger; } public void Process(string message) { logger.LogInformation("Received message: {0}", message); } }
Now convert MessageListener
into a non-static class that depends on IOptions<RabbitOptions>
and MessageHandlerFactory
.It's very similar to your original one, I just replaced the parameters of the Start methods with the options dependency and the handler factory is now a dependency instead of a static class:
public class MessageListener { private readonly RabbitOptions opts; private readonly MessageHandlerFactory handlerFactory; private IConnection _connection; private IModel _channel; public MessageListener(IOptions<RabbitOptions> opts, MessageHandlerFactory handlerFactory) { this.opts = opts.Value; this.handlerFactory = handlerFactory; } public void Start() { var factory = new ConnectionFactory { HostName = opts.HostName, Port = opts.Port, UserName = opts.UserName, Password = opts.Password, VirtualHost = "/", AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(15) }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true); var queueName = "myQueue"; QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null); _channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey"); var consumer = new EventingBasicConsumer(_channel); consumer.Received += ConsumerOnReceived; _channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); } public void Stop() { _channel.Close(200, "Goodbye"); _connection.Close(); } private void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) { // get the details from the event var body = ea.Body; var message = Encoding.UTF8.GetString(body); var messageType = "endpoint"; // hardcoding the message type while we dev... //var messageType = Encoding.UTF8.GetString(ea.BasicProperties.Headers["message-type"] as byte[]); // instantiate the appropriate handler based on the message type IMessageProcessor processor = handlerFactory.Create(messageType); processor.Process(message); // Ack the event on the queue IBasicConsumer consumer = (IBasicConsumer)sender; consumer.Model.BasicAck(ea.DeliveryTag, false); } }
Almost there, you will need to update the Startup.ConfigureServices
method so it knows about your services and options (You can create interfaces for the listener and handler factory if you want):
public void ConfigureServices(IServiceCollection services) { // ... // Add RabbitMQ services services.Configure<RabbitOptions>(Configuration.GetSection("rabbit")); services.AddTransient<MessageListener>(); services.AddTransient<MessageHandlerFactory>(); services.AddTransient<IpSetMessageProcessor>(); services.AddTransient<EndpointMessageProcessor>(); }
Finally, update the Startup.Configure
method to take an extra IApplicationLifetime
parameter and start/stop the message listener in the ApplicationStarted
/ApplicationStopped
events (Although I noticed a while ago some issues with the ApplicationStopping event using IISExpress, as in this question):
public MessageListener MessageListener { get; private set; } public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime appLifetime) { appLifetime.ApplicationStarted.Register(() => { MessageListener = app.ApplicationServices.GetService<MessageListener>(); MessageListener.Start(); }); appLifetime.ApplicationStopping.Register(() => { MessageListener.Stop(); }); // ... }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With