I am using CqrsLite for a CQRS-style project. The Save method of the concrete Repository implementation looks like so (with irrelevant lines omitted).
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var i = 0;
foreach (var @event in aggregate.GetUncommittedChanges())
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
_publisher.Publish(@event);
}
aggregate.MarkChangesAsCommitted();
}
What's troubling me is that this method is committing events to be published to subscribers BEFORE the aggregate is told to mark them as committed. Thus, if an event handler that observes a given event chokes then the aggregate will not have committed changes that previous event handlers may have been notified of.
Why would I not move _publisher.Publish(@event) to after aggregate.MarkChangesAsCommitted(), like so. What am I missing?
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var events = aggregate.GetUncommittedChanges();
foreach (var @event in events)
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
}
aggregate.MarkChangesAsCommitted();
_publisher.Publish(events);
}
Both approaches are problematic because there might be an error between Save
and Publish
, no matter in what order the two methods are called. This can lead to unsaved events being published or saved events not being published. The problem of in-memory state corruption (in aggregate objects) exists as well (although that could be handled by simply catching errors produced by event handlers).
One solution to this problem would be to use two-phase commit (available e.g., if your event store is SQL Server-based and the publisher is MSMQ-based). However, this has performance, scalability, and operations implications, and it doesn't allow late subscribers (see below).
The better approach is to allow parties interested in events to pull them out of the event store (ideally, combining this with some sort of notification mechanism or long polling to make it more "reactive"). This moves the responsibility of tracking the last received event to the subscriber, allowing
You should find more about this approach when searching for something like "using the event store as a queue", and the video from Greg's answer probably adds a lot to this as well.
A common algorithm is this one:
I'd like to add that I don't consider event stores that ignore the Save
/Publish
problem production-ready. For alternatives, see Greg Young's Event Store or the (currently more or less unmaintained) NEventStore.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With