Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MassTransit - Can Multiple Consumers All Receive Same Message?

I have one .NET 4.5.2 Service Publishing messages to RabbitMq via MassTransit.

And multiple instances of a .NET Core 2.1 Service Consuming those messages.

At the moment competing instances of the .NET core consumer service steal messages from the others.

i.e. The first one to consume the message takes it off the queue and the rest of the service instances don't get to consume it.

I want ALL instances to consume the same message.

How can I achieve this?

Publisher Service is configured as follows:

 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();

And the .NET Core Consumer Services are configured as follows:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();

And then MyWrapperConsumer looks like this:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}
like image 314
fourbeatcoder Avatar asked Jul 25 '19 20:07

fourbeatcoder


People also ask

Does RabbitMQ support multiple consumers?

No it's not, single queue/multiple consumers with each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.

What is the difference between MassTransit and RabbitMQ?

Broker Topology With RabbitMQ, which supports exchanges and queues, messages are sent or published to exchanges and RabbitMQ routes those messages through exchanges to the appropriate queues. When the bus is started, MassTransit will create exchanges and queues on the virtual host for the receive endpoint.

What is skipped queue in RabbitMQ?

A queue may contain more than one message type, the message type is used to deliver the message to the appropriate consumer configured on the receive endpoint. If a received message is not handled by a consumer, the skipped message will be moved to a skipped queue, which is named with a _skipped suffix.


1 Answers

It sounds like you want to publish messages and have multiple consumer service instances receive them. In that case, each service instance needs to have its own queue. That way, every published message will result in a copy being delivered to each queue. Then, each receive endpoint will read that message from its own queue and consume it.

All that excessive configuration you're doing is going against what you want. To make it work, remove all that exchange type configuration, and just configure each service instance with a unique queue name (you can generate it from host, machine, whatever) and just call Publish on the message producer.

You can see how RabbitMQ topology is configured: https://masstransit-project.com/advanced/topology/rabbitmq.html

like image 67
Chris Patterson Avatar answered Oct 24 '22 06:10

Chris Patterson