Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send various command type by MassTransit and RabbitMQ?

I’m a beginner in using message brokers.
We have a ticketing service which has multiple sub service. A supervisor service gets requests with help of a web API and sends them to sub services.
Any request has a header which is used to detect command type (such as Reserve, Refund, Availability or etc.). We use json for serializing objects.
Now, How to send various message types(different objects) by MassTransit from a publisher such as our supervisor system, in a way that consumer can use it easily?
In general, is it possible to send various message type in MassTransit and rabbitMQ?
Every consumer has only one queue for processing received messages.

Thanks

Update

https://dotnetcodr.com/2016/08/02/messaging-with-rabbitmq-and-net-review-part-1-foundations-and-terminology/

I read This posts suit to start in messaging with MassTransit and didn't see any example to using various message types on these and another resources:

I have multiple commands and need various message types to send with them, but in examples only use a message type such as below:

Sender

    private static void RunMassTransitPublisherWithRabbit()
    {
        string rabbitMqAddress = "rabbitmq://localhost:5672/Ticket";
        string rabbitMqQueue = "mycompany.domains.queues";
        Uri rabbitMqRootUri = new Uri(rabbitMqAddress);

        IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
        {
            rabbit.Host(rabbitMqRootUri, settings =>
            {
                settings.Password("Kalcho^Milano");
                settings.Username("ticketadmin");
            });
        });

        Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
        ISendEndpoint sendEndpoint = sendEndpointTask.Result;

        Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
        {
            Address = "New Street",
            Id = Guid.NewGuid(),
            Preferred = true,
            RegisteredUtc = DateTime.UtcNow,
            Name = "Nice people LTD",
            Type = 1,
            DefaultDiscount = 0
        });
        Console.ReadKey();
    }

Receiver

        private static void RunMassTransitReceiverWithRabbit()
    {
        IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
        {
            IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri("rabbitmq://localhost:5672/Ticket"), settings =>
            {
                settings.Password("Kalcho^Milano");
                settings.Username("ticketadmin");
            });

            rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
            {
                conf.Consumer<RegisterCustomerConsumer>();
            });
        });

        rabbitBusControl.Start();
        Console.ReadKey();

        rabbitBusControl.Stop();
    }

IRegisterCustomer is an interface and I can only get message content in rabbit.ReceiveEndpoint and convert to usable object.

Now, How to use various message types such as IReserveTicket, IRefundTicket and IGetAvailability to sending and receiving messages?

Thanks again

like image 297
Mohammad reza Beizavi Avatar asked Apr 15 '17 09:04

Mohammad reza Beizavi


1 Answers

If you add more consumers to your endpoint, like this

rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
    conf.Consumer<RegisterCustomerConsumer>();
    conf.Consumer<ReserveTicketConsumer>();
    conf.Consumer<RefundTicketConsumer>();
});

And send messages like

await endpoint.Send<IReserveTicket>(new { TickedId = 123 });

It will just work.

The above solution assumes you do not heavy load, especially unequal load where you get millions of messages of one type and may be hundreds of other types. Having all of them in one endpoint will create consumption misbalance since there is only one queue for all those consumers. In such case, nothing stops you from defining as many endpoints as you need, each of them should have a separate queue. For example:

cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.lowvolume", 
    c =>
    {
        c.Consumer<RegisterCustomerConsumer>();
        c.Consumer<RefundTicketConsumer>();
    });
cfg.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.highvolume", 
    c => c.Consumer<ReserveTicketConsumer>();

Just remember that since you have different queues, you need to use these addresses to get send endpoints.

like image 63
Alexey Zimarev Avatar answered Sep 20 '22 14:09

Alexey Zimarev