I am trying to implement a publish only bus in MassTransit v3 with C# and RabbitMQ where the bus has no consumer. The concept is messages will be published and queued, then a separate microservice will consume messages from the queue. Looking at this SO answer, receive endpoints must be specified so that messages are actually queued. However this appears to contradict the common gotchas in the MassTransit docs, which states If you need to only send or publish messages, don’t create any receive endpoints
.
Here is some sample code:
public class Program
{
static void Main(string[] args)
{
var bus = BusConfigurator.ConfigureBus();
bus.Start();
bus.Publish<IItemToQueue>(new ItemToQueue { Text = "Hello World" }).Wait();
Console.ReadKey();
bus.Stop();
}
}
public static class BusConfigurator
{
public static IBusControl ConfigureBus()
{
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ReceiveEndpoint(host, "queuename", e =>
{
e.Consumer<MyConsumer>();
});
});
return bus;
}
}
public interface IItemToQueue
{
string Text { get; set; }
}
public class ItemToQueue : IItemToQueue
{
public string Text { get; set; }
}
public class MyConsumer : IConsumer<IItemToQueue>
{
public async Task Consume(ConsumeContext<IItemToQueue> context)
{
await Console.Out.WriteLineAsync(context.Message.Text);
}
}
In this sample, I receive the message in the RabbitMQ queue as expected, and this is consumed by MyConsumer
which writes Hello World to the console and the message is then removed from the Queue.
However, when I remove the following code from the above and re-run the sample:
cfg.ReceiveEndpoint(host, RabbitMqConstants.ValidationQueue, e =>
{
e.Consumer<MyConsumer>();
});
A temporary queue is created (with a generated name) and the message never seems to be placed into the temporary queue. This queue is then removed when the bus is stopped.
The problem I have is with a ReceiveEndpoint specified, the messages will be consumed and removed from the queue in the publisher program (meaning the consumer microservice wouldn't process queued items). Without a RecieveEndpoint specified, a temporary queue is used (and the consumer microservice would not know the name of this temporary queue), the message never seems to get queued and the queue is deleted when the bus is stopped which wouldn't be good if the program went down.
There is an example of a send only bus in the MassTransit docs but it is pretty basic so I was wondering if anyone had any suggestions?
The receive endpoint should be in your service, separate from the publish-only application. That way, the service will have the receive endpoint and consume the messages as they are published by the application.
If you have the receive endpoint in the application, the application will consume the messages since it has the same queue name specified in the receive endpoint.
What you need to do is create another service, with the same configuration (including the receive endpoint) - and take the receive endpoint out of your application. At that point, the service will have the receive endpoint and consume the messages from the queue. Even if the service is stopped, the messages will continue to be delivered to the queue and once the service is started they will begin consuming.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With