Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Relation between command handlers, aggregates, the repository and the event store in CQRS

I'd like to understand some details of the relations between command handlers, aggregates, the repository and the event store in CQRS-based systems.

What I've understood so far:

  • Command handlers receive commands from the bus. They are responsible for loading the appropriate aggregate from the repository and call the domain logic on the aggregate. Once finished, they remove the command from the bus.
  • An aggregate provides behavior and an internal state. State is never public. The only way to change state is by using the behavior. The methods that model this behavior create events from the command's properties, and apply these events to the aggregate, which in turn call an event handlers that sets the internal state accordingly.
  • The repository simply allows loading aggregates on a given ID, and adding new aggregates. Basically, the repository connects the domain to the event store.
  • The event store, last but not least, is responsible for storing events to a database (or whatever storage is used), and reloading these events as a so-called event stream.

So far, so good. Now there are some issues that I did not yet get:

  • If a command handler is to call behavior on a yet existing aggregate, everything is quite easy. The command handler gets a reference to the repository, calls its loadById method and the aggregate is returned. But what does the command handler do when there is no aggregate yet, but one should be created? From my understanding the aggregate should later-on be rebuilt using the events. This means that creation of the aggregate is done in reply to a fooCreated event. But to be able to store any event (including the fooCreated one), I need an aggregate. So this looks to me like a chicken-and-egg problem: I can not create the aggregate without the event, but the only component that should create events is the aggregate. So basically it comes down to: How do I create new aggregates, who does what?
  • When an aggregate triggers an event, an internal event handler responses to it (typically by being called via an apply method) and changes the aggregate's state. How is this event handed over to the repository? Who originates the "please send the new events to the repository / event store" action? The aggregate itself? The repository by watching the aggregate? Someone else who is subscribed to the internal events? ...?
  • Last but not least I have a problem understanding the concept of an event stream correctly: In my imagination, it's simply something like an ordered list of events. What's of importance is that it's "ordered". Is this right?
like image 847
Golo Roden Avatar asked Sep 11 '12 04:09

Golo Roden


1 Answers

The following is based on my own experience and my experiments with various frameworks like Lokad.CQRS, NCQRS, etc. I'm sure there are multiple ways to handle this. I'll post what makes most sense to me.

1. Aggregate Creation:

Every time a command handler needs an aggregate, it uses a repository. The repository retrieves the respective list of events from the event store and calls an overloaded constructor, injecting the events

var stream = eventStore.LoadStream(id) var User = new User(stream) 

If the aggregate didn't exist before, the stream will be empty and the newly created object will be in it's original state. You might want to make sure that in this state only a few commands are allowed to bring the aggregate to life, e.g. User.Create().

2. Storage of new Events

Command handling happens inside a Unit of Work. During command execution every resulting event will be added to a list inside the aggregate (User.Changes). Once execution is finished, the changes will be appended to the event store. In the example below this happens in the following line:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes) 

3. Order of Events

Just imagine what would happen, if two subsequent CustomerMoved events are replayed in the wrong order.

An Example

I'll try to illustrate the with a piece of pseudo-code (I deliberately left repository concerns inside the command handler to show what would happen behind the scenes):

Application Service:

UserCommandHandler     Handle(CreateUser cmd)         stream = store.LoadStream(cmd.UserId)         user = new User(stream.Events)         user.Create(cmd.UserName, ...)         store.AppendToStream(cmd.UserId, stream.Version, user.Changes)      Handle(BlockUser cmd)         stream = store.LoadStream(cmd.UserId)         user = new User(stream.Events)         user.Block(string reason)         store.AppendToStream(cmd.UserId, stream.Version, user.Changes) 

Aggregate:

User     created = false     blocked = false      Changes = new List<Event>      ctor(eventStream)         isNewEvent = false         foreach (event in eventStream)             this.Apply(event, isNewEvent)      Create(userName, ...)         if (this.created) throw "User already exists"         isNewEvent = true         this.Apply(new UserCreated(...), isNewEvent)      Block(reason)         if (!this.created) throw "No such user"         if (this.blocked) throw "User is already blocked"         isNewEvent = true         this.Apply(new UserBlocked(...), isNewEvent)      Apply(userCreatedEvent, isNewEvent)         this.created = true         if (isNewEvent) this.Changes.Add(userCreatedEvent)      Apply(userBlockedEvent, isNewEvent)         this.blocked = true         if (isNewEvent) this.Changes.Add(userBlockedEvent) 

Update:

As a side note: Yves' answer reminded me of an interesting article by Udi Dahan from a couple of years ago:

  • Don’t Create Aggregate Roots
like image 145
Dennis Traub Avatar answered Sep 26 '22 14:09

Dennis Traub