Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a state machine with Automatonymous in C#

I am trying to implement a simple example/demo for a state machine using Automatonymous with RabbitMQ. Unfortunately I could not find one to rebuild / learn from (I found the ShoppingWeb, but in my eyes it's anything but simple). Also in my opinion the documentation is lacking information.

This is the state machine example I thought of (sorry, it's pretty ugly): enter image description here Please note that this example is completely made up and it's not important if it makes sense or not. This project's purpose is to get "warm" with Automatonymous.

What I want to do / to have is:

  • Four applications running:
    1. The state machine itself
    2. The "requester" sending requests to be interpreted
    3. The "validator" or "parser" checking if the provided request is valid
    4. The "interpreter" interpreting the given request
  • An example of this could be:
    • Requester sends "x=5"
    • Validator checks if a "=" is contained
    • Intepreter says "5"

My implementation of the state machine looks like this:

public class InterpreterStateMachine : MassTransitStateMachine<InterpreterInstance>
    {
        public InterpreterStateMachine()
        {
            InstanceState(x => x.CurrentState);
            Event(() => Requesting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString)
                .SelectId(context => Guid.NewGuid())); 
            Event(() => Validating, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));
            Event(() => Interpreting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));

            Initially(
                When(Requesting)
                    .Then(context =>
                    {
                        context.Instance.Request = new Request(context.Data.Request.RequestString);                        
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request received: {context.Data.Request.RequestString}"))
                    .Publish(context => new ValidationNeededEvent(context.Instance))
                    .TransitionTo(Requested)
                );

            During(Requested,
                When(Validating)
                    .Then(context =>
                    {
                        context.Instance.Request.IsValid = context.Data.Request.IsValid;
                        if (!context.Data.Request.IsValid)
                        {
                            this.TransitionToState(context.Instance, Error);
                        }
                        else
                        {
                            this.TransitionToState(context.Instance, RequestValid);
                        }
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' validated with {context.Instance.Request.IsValid}"))
                    .Publish(context => new InterpretationNeededEvent(context.Instance))
                    ,
                Ignore(Requesting),
                Ignore(Interpreting)
                );

            During(RequestValid,
                When(Interpreting)
                    .Then((context) =>
                    {
                        //do something
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' interpreted with {context.Data.Answer}"))
                    .Publish(context => new AnswerReadyEvent(context.Instance))
                    .TransitionTo(AnswerReady)
                    .Finalize(),
                Ignore(Requesting),
                Ignore(Validating)
                );

            SetCompletedWhenFinalized();
        }

        public State Requested { get; private set; }
        public State RequestValid { get; private set; }
        public State AnswerReady { get; private set; }
        public State Error { get; private set; }

        //Someone is sending a request to interprete
        public Event<IRequesting> Requesting { get; private set; }
        //Request is validated
        public Event<IValidating> Validating { get; private set; }
        //Request is interpreted
        public Event<IInterpreting> Interpreting { get; private set; }


        class ValidationNeededEvent : IValidationNeeded
        {
            readonly InterpreterInstance _instance;

            public ValidationNeededEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;

            public Request Request => _instance.Request;
        }

        class InterpretationNeededEvent : IInterpretationNeeded
        {
            readonly InterpreterInstance _instance;

            public InterpretationNeededEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;
        }

        class AnswerReadyEvent : IAnswerReady
        {
            readonly InterpreterInstance _instance;

            public AnswerReadyEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;
        }    
    }

Then I have services like this:

public class RequestService : ServiceControl
    {
        readonly IScheduler scheduler;
        IBusControl busControl;
        BusHandle busHandle;
        InterpreterStateMachine machine;
        InMemorySagaRepository<InterpreterInstance> repository;

        public RequestService()
        {
            scheduler = CreateScheduler();
        }

        public bool Start(HostControl hostControl)
        {
            Console.WriteLine("Creating bus...");

            machine = new InterpreterStateMachine();
            repository = new InMemorySagaRepository<InterpreterInstance>();


            busControl = Bus.Factory.CreateUsingRabbitMq(x =>
            {
                IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
                {
                    /*credentials*/
                });

                x.UseInMemoryScheduler();

                x.ReceiveEndpoint(host, "interpreting_answer", e =>
                {
                    e.PrefetchCount = 5; //?
                    e.StateMachineSaga(machine, repository);
                });

                x.ReceiveEndpoint(host, "2", e =>
                {
                    e.PrefetchCount = 1;
                    x.UseMessageScheduler(e.InputAddress);

                    //Scheduling !?

                    e.Consumer(() => new ScheduleMessageConsumer(scheduler));
                    e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
                });

            });

            Console.WriteLine("Starting bus...");

            try
            {
                busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => busControl.StartAsync());
                scheduler.JobFactory = new MassTransitJobFactory(busControl);
                scheduler.Start();
            }
            catch (Exception)
            {
                scheduler.Shutdown();
                throw;
            }

            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            Console.WriteLine("Stopping bus...");

            scheduler.Standby();

            if (busHandle != null) busHandle.Stop();

            scheduler.Shutdown();

            return true;
        }

        static IScheduler CreateScheduler()
        {
            ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
            IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler()); ;

            return scheduler;
        }
    }

My questions are:

  1. How do I send the "intial" request, so that the state machine will transition to my initial state
  2. How do I "react" within the consumers to check the data that were sent and then send new data like in 1?
like image 291
quorti Avatar asked May 18 '18 12:05

quorti


1 Answers

Okay I figured it out. I probably had problems because I'm not only new to Masstransit/Automatonymous and RabbitMQ, but also don't have much experience with C# yet.

So if anyone ever will have the same problem, here is what you need: Given the above example there are three different types plus some small interfaces needed:

  1. A sender (in this case the "requester") including a specific consumer
  2. A service that consumes specific message types (the "validator" and "interpreter")
  3. A service that holds the state machine without a specific consumer
  4. Some "contracts", which are interfaces defining the type of message that's sent/consumed

1) This is the sender:

    using InterpreterStateMachine.Contracts;
    using MassTransit;
    using System;
    using System.Threading.Tasks;

    namespace InterpreterStateMachine.Requester
    {
        class Program
        {
            private static IBusControl _busControl;

            static void Main(string[] args)
            {            
                var busControl = ConfigureBus();
                busControl.Start();

                Console.WriteLine("Enter request or quit to exit: ");
                while (true)
                {
                    Console.Write("> ");
                    String value = Console.ReadLine();

                    if ("quit".Equals(value,StringComparison.OrdinalIgnoreCase))
                        break;

                    if (value != null)
                    {
                        String[] values = value.Split(';');

                        foreach (String v in values)
                        {
                            busControl.Publish<IRequesting>(new
                            {
                                Request = new Request(v),
                                TimeStamp = DateTime.UtcNow
                            });
                        }
                    }
                }

                busControl.Stop();
            }


            static IBusControl ConfigureBus()
            {
                if (null == _busControl)
                {
                    _busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
                    {                    
                        var host = cfg.Host(new Uri(/*rabbitMQ server*/), h =>
                        {                        
                            /*credentials*/
                        });

                        cfg.ReceiveEndpoint(host, "answer_ready", e =>
                        {
                            e.Durable = true;
                            //here the consumer is registered
                            e.Consumer<AnswerConsumer>();
                        });
                    });
                    _busControl.Start();
                }
                return _busControl;
            }

            //here comes the actual logic of the consumer, which consumes a "contract"
            class AnswerConsumer : IConsumer<IAnswerReady>
            {
                public async Task Consume(ConsumeContext<IAnswerReady> context)
                {
                    await Console.Out.WriteLineAsync($"\nReceived Answer for \"{context.Message.Request.RequestString}\": {context.Message.Answer}.");
                    await Console.Out.WriteAsync(">");
                }
            }        
        }
    }

2) This is the service (here it is the validation sercive)

using InterpreterStateMachine.Contracts;
using MassTransit;
using MassTransit.QuartzIntegration;
using MassTransit.RabbitMqTransport;
using Quartz;
using Quartz.Impl;
using System;
using System.Threading.Tasks;
using Topshelf;

namespace InterpreterStateMachine.Validator
{
    public class ValidationService : ServiceControl
    {
        readonly IScheduler _scheduler;
        static IBusControl _busControl;
        BusHandle _busHandle;        

        public static IBus Bus => _busControl;

        public ValidationService()
        {
            _scheduler = CreateScheduler();
        }

        public bool Start(HostControl hostControl)
        {
            Console.WriteLine("Creating bus...");

            _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
            {
                IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
                {
                    /*credentials*/
                });

                x.UseInMemoryScheduler();
                x.UseMessageScheduler(new Uri(RabbitMqServerAddress));

                x.ReceiveEndpoint(host, "validation_needed", e =>
                {
                    e.PrefetchCount = 1;
                    e.Durable = true;
                    //again this is how the consumer is registered
                    e.Consumer<RequestConsumer>();
                });                               
            });

            Console.WriteLine("Starting bus...");

            try
            {
                _busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => _busControl.StartAsync());
                _scheduler.JobFactory = new MassTransitJobFactory(_busControl);
                _scheduler.Start();
            }
            catch (Exception)
            {
                _scheduler.Shutdown();
                throw;
            }                
            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            Console.WriteLine("Stopping bus...");
            _scheduler.Standby();
            _busHandle?.Stop();
            _scheduler.Shutdown();
            return true;
        }

        static IScheduler CreateScheduler()
        {
            ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
            IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler());

            return scheduler;
        }
    }

    //again here comes the actual consumer logic, look how the message is re-published after it was checked
    class RequestConsumer : IConsumer<IValidationNeeded>
    {
        public async Task Consume(ConsumeContext<IValidationNeeded> context)
        {
            await Console.Out.WriteLineAsync($"(c) Received {context.Message.Request.RequestString} for validation (Id: {context.Message.RequestId}).");

            context.Message.Request.IsValid = context.Message.Request.RequestString.Contains("=");

            //send the new message on the "old" context
            await context.Publish<IValidating>(new
            {
                Request = context.Message.Request,
                IsValid = context.Message.Request.IsValid,
                TimeStamp = DateTime.UtcNow,
                RequestId = context.Message.RequestId
            });
        }
    }
}

The validator consumes the contract "IValidationNeeded" and then publishes the contract "IValidating", which then will be consumed by the state machine itself (the "Validating" event).

3) The difference between a consumer service and the sate machine service lies withing the "ReceiveEndpoint". Here is no consumer registered, but the state machine is set:

...
InterpreterStateMachine _machine = new InterpreterStateMachine();
InMemorySagaRepository<InterpreterInstance> _repository = new InMemorySagaRepository<InterpreterInstance>();
...
x.ReceiveEndpoint(host, "state_machine", e =>
{
    e.PrefetchCount = 1;
    //here the state machine is set
    e.StateMachineSaga(_machine, _repository);
    e.Durable = false;
});

4) Last but not least, the contracts are pretty small and look like this:

using System;

namespace InterpreterStateMachine.Contracts
{
    public interface IValidationNeeded
    {
        Guid RequestId { get; }
        Request Request { get; }
    }
}

So overall it's pretty straightforward, I just had to use my brain :D

I hope this will help someone.

like image 88
quorti Avatar answered Nov 14 '22 22:11

quorti