Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Competing Consumers in Mass Transit with RabbitMQ

I've implemented a simple publisher/consumer set with MassTransit, and I want to have the consumers read the messages from the same queue. However, when I run it, I see a large portion of the messages sent to the error queue instead of being consumed. From the discussions I've seen (SO, Forum), this should be really really simple with RabbitMQ (just point to the same queue), but it's not working. Is there an additional configuration that should be set?

Here's My Publisher

public class YourMessage { public string Text { get; set; } }
public class Program
{
    public static void Main()
    {
        Console.WriteLine("Publisher");
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
        });
        var x = Console.Read();
        for (var i = 0; i <= 1000; i++)
        {
            Console.WriteLine("Message Number " + i);
            Bus.Instance.Publish(new YourMessage { "Message Number " + i });
        }
    }
}

And My Consumer

public class YourMessage { public string Text { get; set; } }
public class Program
{
    public static void Main()
    {
        Console.WriteLine("Consumer");
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
            sbc.Subscribe(subs =>
            {
                var del = new Action<IConsumeContext<YourMessage>,YourMessage>((context, msg) =>
                {
                    Console.WriteLine(msg.Text);
                });
                subs.Handler<YourMessage>(del);
            });
        });
        while (true) { }
    }
}
like image 998
JoshRivers Avatar asked May 08 '12 16:05

JoshRivers


People also ask

What is MassTransit RabbitMQ?

What is MassTransit? MassTransit is a free, open-source, distributed application framework for . NET applications. It abstracts away the underlying logic required to work with message brokers, such as RabbitMQ, making it easier to create message-based, loosely coupled applications.

What is difference between RabbitMQ and MassTransit?

Broker TopologyWith RabbitMQ, which supports exchanges and queues, messages are sent or published to exchanges and RabbitMQ routes those messages through exchanges to the appropriate queues. When the bus is started, MassTransit will create exchanges and queues on the virtual host for the receive endpoint.

What is skipped queue in RabbitMQ?

A queue may contain more than one message type, the message type is used to deliver the message to the appropriate consumer configured on the receive endpoint. If a received message is not handled by a consumer, the skipped message will be moved to a skipped queue, which is named with a _skipped suffix.


2 Answers

The receiver/consumer and publisher cannot be on the same queue. If you want competing consumers have multiple instances of the consumer running against the same queue.

We have documentation, but this section is currently lacking, so I understand your confusion: http://readthedocs.org/docs/masstransit/en/latest/configuration/gotchas.html#how-to-setup-a-competing-consumer If you succeed, help with the documentation would be wonderful.

like image 100
Travis Avatar answered Nov 04 '22 10:11

Travis


So, it looks like the solution was to change the line in the publisher:

 sbc.ReceiveFrom("rabbitmq://localhost/test_queue"); 

To something like:

 sbc.ReceiveFrom("rabbitmq://localhost/test_queue_publisher"); 

This prevented the publishers from competing for messages they weren't configured to consume.

like image 43
JoshRivers Avatar answered Nov 04 '22 12:11

JoshRivers