Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

EventSourced Saga Implementation

I have written an Event Sourced Aggregate and now implemented an Event Sourced Saga... I have noticed the two are similair and created an event sourced object as a base class from which both derive.

I have seen one demo here http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/ but feel there may be an issue as Commands could be lost in the event of a process crash as the sending of commands is outside the write transaction?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

Instead I am using Greg Young's EventStore and when I save an EventSourcedObject (either an aggregate or a saga) the sequence is as follows:

  1. Repository gets list of new MutatingEvents.
  2. Writes them to stream.
  3. EventStore fires off new events when streams are written to and committed to the stream.
  4. We listen for the events from the EventStore and handle them in EventHandlers.

I am implementing the two aspects of a saga:

  1. To take in events, which may transition state, which in turn may emit commands.
  2. To have an alarm where at some point in the future (via an external timer service) we can be called back).

Questions

  1. As I understand event handlers should not emit commands (what happens if the command fails?) - but am I OK with the above since the Saga is the actual thing controlling the creation of commands (in reaction to events) via this event proxy, and any failure of Command sending can be handled externally (in the external EventHandler that deals with CommandEmittedFromSaga and resends if the command fails)?

  2. Or do I forget wrapping events and store native Commands and Events in the same stream (intermixed with a base class Message - the Saga would consume both Commands and Events, an Aggregate would only consume Events)?

  3. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

Some background code is below.

Saga issues a command to Run (wrapped in a CommandEmittedFromSaga event)

Command below is wrapped inside event:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga requests a callback at some point in future (AlarmRequestedBySaga event)

Alarm callback request is wrapped onside an event, and will fire back and event to the Saga on or after the requested time:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

Alternatively I can store both Commands and Events in the same stream of base type Message

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}
like image 897
morleyc Avatar asked Oct 30 '15 06:10

morleyc


1 Answers

I believe the first two questions are the result of a wrong understanding of Process Managers (aka Sagas, see note on terminology at bottom).

Shift your thinking

It seems like you are trying to model it (as I once did) as an inverse aggregate. The problem with that: the "social contract" of an aggregate is that its inputs (commands) can change over time (because systems must be able to change over time), but its outputs (events) cannot. Once written, events are a matter of history and the system must always be able to handle them. With that condition in place, an aggregate can be reliably loaded from an immutable event stream.

If you try to just reverse the inputs and outputs as a process manager implementation, it's output cannot be a matter of record because commands can be deprecated and removed from the system over time. When you try to load a stream with a removed command, it will crash. Therefore a process manager modeled as an inverse aggregate could not be reliably reloaded from an immutable message stream. (Well I'm sure you could devise a way... but is it wise?)

So let's think about implementing a Process Manager by looking at what it replaces. Take for example an employee who manages a process like order fulfillment. The first thing you do for this user is setup a view in the UI for them to look at. The second thing you do is to make buttons in the UI for the user to perform actions in response to what they see on the view. Ex. "This row has PaymentFailed, so I click CancelOrder. This row has PaymentSucceeded and OrderItemOutOfStock, so I click ChangeToBackOrder. This order is Pending and 1 day old, so I click FlagOrderForReview"... and so forth. Once the decision process is well-defined and starts requiring too much of the user's time, you are tasked to automate this process. To automate it, everything else can stay the same (the view, even some of the UI so you can check on it), but the user has changed to be a piece of code.

"Go away or I will replace you with a very small shell script."

The process manager code now periodically reads the view and may issue commands if certain data conditions are present. Essentially, the simplest version of a Process Manager is some code that runs on a timer (e.g. every hour) and depends on particular view(s). That's the place where I would start... with stuff you already have (views/view updaters) and minimal additions (code that runs periodically). Even if you decide later that you need different capability for certain use cases, "Future You" will have a better idea of the specific shortcomings that need addressing.

And this is a great place to remind you of Gall's law and probably also YAGNI.

  1. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

Good material is hard to find as these concepts have very malleable implementations, and there are diverse examples, many of which are over-engineered for general purposes. However, here are some references that I have used in the answer.

DDD - Evolving Business Processes
DDD/CQRS Google Group (lots of reading material)


Note that the term Saga has a different implication than a Process Manager. A common saga implementation is basically a routing slip with each step and its corresponding failure compensation included on the slip. This depends on each receiver of the routing slip performing what is specified on the routing slip and successfully passing it on to the next hop or performing the failure compensation and routing backward. This may be a bit too optimistic when dealing with multiple systems managed by different groups, so process managers are often used instead. See this SO question for more information.

like image 59
Kasey Speakman Avatar answered Sep 29 '22 07:09

Kasey Speakman