I've implemented a simple publisher/consumer set with MassTransit, and I want to have the consumers read the messages from the same queue. However, when I run it, I see a large portion of the messages sent to the error queue instead of being consumed. From the discussions I've seen (SO, Forum), this should be really really simple with RabbitMQ (just point to the same queue), but it's not working. Is there an additional configuration that should be set?
Here's My Publisher
public class YourMessage { public string Text { get; set; } }
public class Program
{
public static void Main()
{
Console.WriteLine("Publisher");
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
});
var x = Console.Read();
for (var i = 0; i <= 1000; i++)
{
Console.WriteLine("Message Number " + i);
Bus.Instance.Publish(new YourMessage { "Message Number " + i });
}
}
}
And My Consumer
public class YourMessage { public string Text { get; set; } }
public class Program
{
public static void Main()
{
Console.WriteLine("Consumer");
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
sbc.Subscribe(subs =>
{
var del = new Action<IConsumeContext<YourMessage>,YourMessage>((context, msg) =>
{
Console.WriteLine(msg.Text);
});
subs.Handler<YourMessage>(del);
});
});
while (true) { }
}
}
What is MassTransit? MassTransit is a free, open-source, distributed application framework for . NET applications. It abstracts away the underlying logic required to work with message brokers, such as RabbitMQ, making it easier to create message-based, loosely coupled applications.
Broker TopologyWith RabbitMQ, which supports exchanges and queues, messages are sent or published to exchanges and RabbitMQ routes those messages through exchanges to the appropriate queues. When the bus is started, MassTransit will create exchanges and queues on the virtual host for the receive endpoint.
A queue may contain more than one message type, the message type is used to deliver the message to the appropriate consumer configured on the receive endpoint. If a received message is not handled by a consumer, the skipped message will be moved to a skipped queue, which is named with a _skipped suffix.
The receiver/consumer and publisher cannot be on the same queue. If you want competing consumers have multiple instances of the consumer running against the same queue.
We have documentation, but this section is currently lacking, so I understand your confusion: http://readthedocs.org/docs/masstransit/en/latest/configuration/gotchas.html#how-to-setup-a-competing-consumer If you succeed, help with the documentation would be wonderful.
So, it looks like the solution was to change the line in the publisher:
sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
To something like:
sbc.ReceiveFrom("rabbitmq://localhost/test_queue_publisher");
This prevented the publishers from competing for messages they weren't configured to consume.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With