Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MassTransit and event versus command publishing

Tags:

masstransit

I'm new to MassTransit, and I miss something in my understanding.

Let's say I have a server farm were all nodes can do the same job. The application framework is CQRS's styled. That means I have two base kind of message to publish :

  • Commands : must be handled by exactly one of the server, any of them (the first with job slot free)
  • Events : must be handled by all servers

I've have build an extremely simple MassTransit prototype (a console application that is sending hello every X seconds).

In the API, I can see there is a "publish" method. How can I specify what kind of message it is (one versus all server)?

If I look a the "handler" configuration, I can specify the queue uri. If I specify the same queue for all hosts, all hosts will get the message, but I cannot limit the execution to only one server.

If I listen from a host dedicated queue, only one server will handle the messages, but I don't know how to broadcast the other kind of message.

Please help me to understand what I'm missing.

PS: if it cares, my messaging system is rabbitmq.

In order to test, I have create a common class library with this classes :

public static class ActualProgram
{
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();

    private static readonly Random g_Random = new Random();

    public static void ActualMain(int delay, int instanceName)
    {
        Thread.Sleep(delay);
        SetupBus(instanceName);

        Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);

        Console.WriteLine("Press enter at any time to exit");
        Console.ReadLine();
        g_Shutdown.Cancel();

        Bus.Shutdown();
    }

    private static void PublishRandomMessage()
    {
        Bus.Instance.Publish(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });

        if (!g_Shutdown.IsCancellationRequested)
        {
            Thread.Sleep(g_Random.Next(500, 10000));
            Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
        }
    }

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }

    private static void MessageHandled(Message msg)
    {
        ConsoleColor color = ConsoleColor.Red;
        switch (msg.Sender)
        {
            case "test_app1":
                color = ConsoleColor.Green;
                break;

            case "test_app2":
                color = ConsoleColor.Blue;
                break;

            case "test_app3":
                color = ConsoleColor.Yellow;
                break;
        }
        Console.ForegroundColor = color;
        Console.WriteLine(msg.ToString());
        Console.ResetColor();
    }

    private static void MessageConsumed(Message msg)
    {
        Console.WriteLine(msg.ToString());
    }
}

public class Message
{
    public long Id { get; set; }

    public string Sender { get; set; }

    public string Body { get; set; }

    public override string ToString()
    {
        return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
    }
}

I have also 3 console applications that just run the ActualMain method :

internal class Program
{
    private static void Main(string[] args)
    {
        ActualProgram.ActualMain(0, 1);
    }
}
like image 419
Steve B Avatar asked Jul 27 '12 15:07

Steve B


1 Answers

What you want is known as Competing Consumers (search SO for that you'll find more info) Using RabbitMQ makes life easy, all you need to do is specify the same queue name for each consumer you start, the message will be processed by only one of them. Instead of generating a unique queue each time as you are doing.

private static void SetupBus(int instanceName)
{
    Bus.Initialize(sbc =>
    {
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/Commands);
        sbc.Subscribe(subs =>
        {
            subs.Handler<Message>(MessageHandled);
        });
    });
}

AFAIK, you'll need to have a separate process for command handlers as opposed to event handlers. All the command handlers will ReceiveFrom the same queue, all event handlers will ReceiveFrom their own unique queue.

The other piece of the puzzle is how you get messages into the bus. You can still use publish for commands, but if you have configured consumers incorrectly you could get multiple executions as the message will go to all consumers, if you want to guarantee the message ends up on a single queue you can use Send rather than Publish.

     Bus.Instance
         .GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
        .Send(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });
like image 70
Adam Mills Avatar answered Nov 12 '22 21:11

Adam Mills