Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CQRS + ES: Can you process multiple commands in parallel?

I can see how commands can be executed in a strict one at a time sequence with the CQRS + ES pattern. Having multiple commands executing in parallel would significantly increase the throughput of the system and make use of all the cores I have on the server. But surely this risks concurrency issues?

Take a simple example, two users update the customers name and submit the commands at the same time. In parallel two processes replay previous events to get aggregates into the latest state and then both determine everything is valid. Both then save a new update event. Whichever one saves last would seem to be the winner because any future replay would result in that last update being the customers name.

What if the result of the command is to send an email to the customer. Now we send two emails instead of the expected one. What if the command results in creating a new aggregate that gets attached to the customer using a customer property, we end up with two new aggregates created but one is a zombie because it is never referenced by the customer because the other aggregate has been saved as the reference instead.

It feels like you would need to implement your own transaction locking capability to ensure you never get overwrites and that is a complex solution I would rather avoid!

UPDATE:

My example was a little too simple. Imagine we have an Invoice and Item entries that go into an Invoice. After adding/changing/removing an Item from the Invoice we need to find the new total and set it into the Invoice.Total property field.

If in parallel two users both add a new Item then the total would be wrong with no explicit locking mechanism. Both recreate the Invoice aggregate and the existing list of Item child aggregates for that Invoice. In parallel both create a new Item and then add up the new total and set it to the Invoice.Total property.

Both then save events for these actions. But now we have an error because whichever command finished last will have a total that does not include the new item from the instance that finished first. This has nothing to do with pessimistic/optimistic concurrency but with transactional locking.

like image 916
Phil Wright Avatar asked Dec 23 '22 19:12

Phil Wright


2 Answers

I can see how commands can be executed in a strict one at a time sequence with the CQRS + ES pattern. Having multiple commands executing in parallel would significantly increase the throughput of the system and make use of all the cores I have on the server. But surely this risks concurrency issues?

From the CQRS+ES point of view there is not concurrency issue. Any good Event store implementation has a Aggregate version constraint that guards against concurrent event addition on the same Aggregate, using optimistic locking for example.

UPDATE: CQRS+ES loves and welcomes concurrent commands.

What if the result of the command is to send an email to the customer. Now we send two emails instead of the expected one.

Expected by whom? Again, this is not a CQRS+ES problem. This is business design issue. In this particular case you should have designed another bounded context Aggregate that deals with sending emails to client, having a business invariant that ensures that multiple emails regarding some topic (i.e. username change) are grouped into a single one, the last one having higher priority.

What if the command results in creating a new aggregate that gets attached to the customer using a customer property, we end up with two new aggregates created but one is a zombie because it is never referenced by the customer because the other aggregate has been saved as the reference instead.

Again, it depends on your business. From the CQRS+ES point of view there is no problem. Event stores are great at dealing with a lot of data (and this is because an event store is an append only persistence). That zombie aggregate is harmless. It occupies only some events in the event store. In the projections (read models) there is no pain as it is not present.

It feels like you would need to implement your own transaction locking capability to ensure you never get overwrites and that is a complex solution I would rather avoid!

This is a problem that you would have regardless of using CQRS+ES. If this is a business requirement you have to do it anyway.

In the case of two users changing the username at the same time you could do some additional checks and inform the loosing user that other user has modified the username, possibly even showing what username and letting him to decide what to do next: retry or cancel.

AFTER UPDATE:

My example was a little too simple. Imagine we have an Invoice and Item entries that go into an Invoice. After adding/changing/removing an Item from the Invoice we need to find the new total and set it into the Invoice.Total property field.

If in parallel two users both add a new Item then the total would be wrong with no explicit locking mechanism. Both recreate the Invoice aggregate and the existing list of Item child aggregates for that Invoice. In parallel both create a new Item and then add up the new total and set it to the Invoice.Total property.

Both then save events for these actions. But now we have an error because whichever command finished last will have a total that does not include the new item from the instance that finished first. This has nothing to do with pessimistic/optimistic concurrency but with transactional locking

I think that you must understand how Event sourcing works: Let's suppose that both users have in front of them the Invoice (in fact it is the Cart that they see, not the Invoice; the Invoice is generated after an Order is placed, but for the sake of simplicity let's assume that). Let's suppose they see 2 items and every one of them wants to add a new item. both of them initiate a command: AddAnItemToInvoiceCommand. This is what happens:

  • the command dispatcher/handler receive the commands at the same time.

  • for each of the commands, it loads the aggregate from the repository, storing the aggregate version, let's suppose that is 10.

  • for each command, the handler calls a method on the aggregate that adds an invoice then it receives the ItemWasAddedToInvoiceEvent twice, one for every command handler execution.

  • for each command, the handler tries to persist the event to the Event store

  • for the first event persisting action (it always it is a first at a nanosecond level) the event is persisted and the aggregate version is incremented; now it is 11;

  • for the second identical event, generated by the second command handler, the event is not persisted because the expected version 10 is not found, as now it is 11. So, the repository retries the commnand execution (This is very important to understand). That is, the all command execution is called again: the aggregate is loaded again from the repository, now at version 11, then the command is applied on it then the event is persisted to the Event store.

So, two events are added to the event store, so the Invoice has the correct state: now there are 4 items with a correct total price being the sum of all 4.

like image 125
Constantin Galbenu Avatar answered May 14 '23 14:05

Constantin Galbenu


This isn't really a cqrs issue at all; it's really specific to event storage, and a particular strategy of event storage.

Take a simple example, two users update the customers name and submit the commands at the same time. In parallel two processes replay previous events to get aggregates into the latest state and then both determine everything is valid. Both then save a new update event. Whichever one saves last would seem to be the winner because any future replay would result in that last update being the customers name.

There are a few things to notice here -- one is that the same issue exists if these commands are run sequentially; the second undoes the effects of the first. Which is exactly the behavior you would want in some cases; example fixing a spelling error.

Second, notice that, because you are just appending, both edits are preserved; the two changes both exist in the stream; it's the view (the fold), not the stream, that decides which of the two edits wins.

Which means, to a degree, you aren't done yet -- you still need to correctly model the strategy for choosing the "winner".

In cases where edits can conflict, then you need to take precautions against allowing a write that didn't acknowledge all of the previously accepted writes. In that case, you don't want to append to a stream, but rather compare and swap.

Digression

Using a traditional database the action of writing and reading from records automatically handles the locking for you, it was the database does. Doing it in CQRS+ES is outside a database and so I would need to do it manually.

Part of your confusion, I believe, is that you are conflating appending events with updating history. The literature is not clear on this point, so it may be valuable to review an alternative approach to maintaining a history.

Imagine, for a moment, that you preserve your state not as a stream of events, but as a document of events. In the happy path, you load the document, modify your local copy, replace the previous copy with your revision.

In a concurrent write scenario, we have two writers that load the document, make different edits, and then each try to do the replace. What happens? If we don't do anything to mitigate concurrent writes, we probably end up with a last writer wins strategy -- corrupting our history by deleting the edits made by the previous writer.

To ensure that we don't lose writes, we need help from the document store. Specifically, we want some analog of a conditional put (for instance, mongodb:findAndMofidy). The second writer should get some flavor of ConcurrentModification response, which it can then mitigate (fail, retry, merge).

Nothing about this changes when we are doing event sourcing as she is spoke.

What does change is the nature of the thing that we "PUT"; because we are accepting the discipline that the representations of the events are immutable, and the collection of events is append only, we've restricted the valid edits of a document to a set that allows us to optimize away transferring the entire document every time.

If you like, you can imagine that we send to the event store a description of the changes we want to make to the document. So the store loads a copy of the document, applies our changes to it, and then stores the updated version.

But the domain model didn't validate that the changes apply to an arbitrary version of the history document, but to a specific version of it. So the message that we send to describe our changes needs to include a reference to the version of the document that we started from.

You can imagine that the "event store" is a linked list in memory. "Append" is analogous to updating the document without ensuring that it is unchanged from what you verified. If we need to make sure that our assumptions are still valid (ie, optimistic concurrency), then we need to compare-and-swap the tail pointer; if a race has allowed some other writer to update the tail since our read, then the CAS operation should fail, and allows us to recover.

A remote event store needs to offer an analogous interface: see the use of expectedVersion in GetEventStore.

You may find it valuable to review Udi Dahan's writings on collaborative domains.

It feels like you would need to implement your own transaction locking capability to ensure you never get overwrites and that is a complex solution I would rather avoid!

You should keep in mind that you signed on for some sort of locking when you decided to use parallel writers! Doing the work to control how to maintain data integrity is part of the trade off you signed onto.

The good news, is that you don't need implement a transaction locking strategy; you need to choose a data store that provides a transaction locking strategy.

Compare and swap really isn't all that complex, it's just complicated.

Both then save events for these actions. But now we have an error because whichever command finished last will have a total that does not include the new item from the instance that finished first. This has nothing to do with pessimistic/optimistic concurrency but with transactional locking.

Right -- you can't blindly append to a stream where you are expecting an invariant to hold. You have to make sure you are appending to the location that you have checked.

There's an alternative design that you may see from time to time, that treats the history as a graph, rather than as a stream. So each write potentially forks the history, and you merge later if necessary. You can get a sense for how that might work reviewing Kleppmann's work on CRDTs

My reading tells me that event graphs are still firmly in the complex quadrant.

like image 44
VoiceOfUnreason Avatar answered May 14 '23 16:05

VoiceOfUnreason