I'm new to MassTransit, and I miss something in my understanding.
Let's say I have a server farm were all nodes can do the same job. The application framework is CQRS's styled. That means I have two base kind of message to publish :
I've have build an extremely simple MassTransit prototype (a console application that is sending hello every X seconds).
In the API, I can see there is a "publish" method. How can I specify what kind of message it is (one versus all server)?
If I look a the "handler" configuration, I can specify the queue uri. If I specify the same queue for all hosts, all hosts will get the message, but I cannot limit the execution to only one server.
If I listen from a host dedicated queue, only one server will handle the messages, but I don't know how to broadcast the other kind of message.
Please help me to understand what I'm missing.
PS: if it cares, my messaging system is rabbitmq.
In order to test, I have create a common class library with this classes :
public static class ActualProgram
{
private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();
private static readonly Random g_Random = new Random();
public static void ActualMain(int delay, int instanceName)
{
Thread.Sleep(delay);
SetupBus(instanceName);
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
Console.WriteLine("Press enter at any time to exit");
Console.ReadLine();
g_Shutdown.Cancel();
Bus.Shutdown();
}
private static void PublishRandomMessage()
{
Bus.Instance.Publish(new Message
{
Id = g_Random.Next(),
Body = "Some message",
Sender = Assembly.GetEntryAssembly().GetName().Name
});
if (!g_Shutdown.IsCancellationRequested)
{
Thread.Sleep(g_Random.Next(500, 10000));
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
}
}
private static void SetupBus(int instanceName)
{
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
sbc.Subscribe(subs =>
{
subs.Handler<Message>(MessageHandled);
});
});
}
private static void MessageHandled(Message msg)
{
ConsoleColor color = ConsoleColor.Red;
switch (msg.Sender)
{
case "test_app1":
color = ConsoleColor.Green;
break;
case "test_app2":
color = ConsoleColor.Blue;
break;
case "test_app3":
color = ConsoleColor.Yellow;
break;
}
Console.ForegroundColor = color;
Console.WriteLine(msg.ToString());
Console.ResetColor();
}
private static void MessageConsumed(Message msg)
{
Console.WriteLine(msg.ToString());
}
}
public class Message
{
public long Id { get; set; }
public string Sender { get; set; }
public string Body { get; set; }
public override string ToString()
{
return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
}
}
I have also 3 console applications that just run the ActualMain method :
internal class Program
{
private static void Main(string[] args)
{
ActualProgram.ActualMain(0, 1);
}
}
What you want is known as Competing Consumers (search SO for that you'll find more info) Using RabbitMQ makes life easy, all you need to do is specify the same queue name for each consumer you start, the message will be processed by only one of them. Instead of generating a unique queue each time as you are doing.
private static void SetupBus(int instanceName)
{
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/Commands);
sbc.Subscribe(subs =>
{
subs.Handler<Message>(MessageHandled);
});
});
}
AFAIK, you'll need to have a separate process for command handlers as opposed to event handlers. All the command handlers will ReceiveFrom
the same queue, all event handlers will ReceiveFrom
their own unique queue.
The other piece of the puzzle is how you get messages into the bus. You can still use publish for commands, but if you have configured consumers incorrectly you could get multiple executions as the message will go to all consumers, if you want to guarantee the message ends up on a single queue you can use Send
rather than Publish
.
Bus.Instance
.GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
.Send(new Message
{
Id = g_Random.Next(),
Body = "Some message",
Sender = Assembly.GetEntryAssembly().GetName().Name
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With