I'd like to understand some details of the relations between command handlers, aggregates, the repository and the event store in CQRS-based systems.
What I've understood so far:
So far, so good. Now there are some issues that I did not yet get:
The following is based on my own experience and my experiments with various frameworks like Lokad.CQRS, NCQRS, etc. I'm sure there are multiple ways to handle this. I'll post what makes most sense to me.
1. Aggregate Creation:
Every time a command handler needs an aggregate, it uses a repository. The repository retrieves the respective list of events from the event store and calls an overloaded constructor, injecting the events
var stream = eventStore.LoadStream(id) var User = new User(stream)
If the aggregate didn't exist before, the stream will be empty and the newly created object will be in it's original state. You might want to make sure that in this state only a few commands are allowed to bring the aggregate to life, e.g. User.Create()
.
2. Storage of new Events
Command handling happens inside a Unit of Work. During command execution every resulting event will be added to a list inside the aggregate (User.Changes
). Once execution is finished, the changes will be appended to the event store. In the example below this happens in the following line:
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
3. Order of Events
Just imagine what would happen, if two subsequent CustomerMoved
events are replayed in the wrong order.
An Example
I'll try to illustrate the with a piece of pseudo-code (I deliberately left repository concerns inside the command handler to show what would happen behind the scenes):
Application Service:
UserCommandHandler Handle(CreateUser cmd) stream = store.LoadStream(cmd.UserId) user = new User(stream.Events) user.Create(cmd.UserName, ...) store.AppendToStream(cmd.UserId, stream.Version, user.Changes) Handle(BlockUser cmd) stream = store.LoadStream(cmd.UserId) user = new User(stream.Events) user.Block(string reason) store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
Aggregate:
User created = false blocked = false Changes = new List<Event> ctor(eventStream) isNewEvent = false foreach (event in eventStream) this.Apply(event, isNewEvent) Create(userName, ...) if (this.created) throw "User already exists" isNewEvent = true this.Apply(new UserCreated(...), isNewEvent) Block(reason) if (!this.created) throw "No such user" if (this.blocked) throw "User is already blocked" isNewEvent = true this.Apply(new UserBlocked(...), isNewEvent) Apply(userCreatedEvent, isNewEvent) this.created = true if (isNewEvent) this.Changes.Add(userCreatedEvent) Apply(userBlockedEvent, isNewEvent) this.blocked = true if (isNewEvent) this.Changes.Add(userBlockedEvent)
Update:
As a side note: Yves' answer reminded me of an interesting article by Udi Dahan from a couple of years ago:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With