Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrency with DDD + CRQS + ES

I've been studying DDD for a while, and stumbled into design patterns like CQRS, and Event sourcing (ES). These patterns can be used to help achieving some concepts of DDD with less effort.

Then I started to develop a simple software to implement all these concepts. And started to imagine possible failure paths.

Just to clarify my architecture, the following Image describes one request coming from the front end and reaching the controller I'm the back end (for simplicity I've ignored all filters, binders).

Sequence Diagram

  1. the actor sends a form with the amount of money he wants to withdraw from one account.
  2. the controller passes the view model to the application layer where it will be transformed into one command
  3. the application layer opens one Unit of Work (UOW) map the VM to the command and send the command to the dispatcher.
  4. the dispatcher finds the corresponding aggregate class that know how to handle the command (account) and asks for the factory for the specific instance of the account.
  5. the factory creates a new instance of account and request all the events from the event store.
  6. the event store returns all events of the account.
  7. the factory sends all events to the aggregate, so that it will be with its internal state correct. And return the instance of the account.
  8. the dispatcher sends the command to the account so it can be handled.
  9. the account check if it has enough money to do the withdrawal. And if it has, it sends one new event "MoneyWithdrawnEvent".
  10. this event is handled by the aggregate (account) altering its internal state.
  11. the application layer closes the UOW and when it does, the UOW checks all loaded aggregates to check if they have new events to save to the event store. If there are, it sends the events to the repository.
  12. the repository persists the events to the eventstore.

There are many layers that can be added, for example: caching of aggregates, caching of events, snapshots, etc.

Sometimes ES can be used in parallel to a relational database. So that, when the UOW saves the new events that have happened, it also persists the aggregates to the relational database.

One of the benefits of ES is that it has one central source of truth, the event store. So, even if the models in memory or even in the relational database gets corrupted, we can rebuild the model from the events.

And having this source of truth, we can build other systems that can consume the events in a different way to form a different model.

However, for this to work, we need the source of truth to be clean and not corrupted. Otherwise all these benefits won't exist.

That said, if we consider concurrency in the architecture described in the image, there can be some issues:

  • if the actor sends the form twice to the backend in a sort period, and the back end starts two threads (one for each request), then they will call two times the application layer, and start two UOW, and so on. This can cause two events to be stored in the event store.

This problem can be handled in many different places:

  1. The front End can control which user/actor can do what action and how many times.

  2. The Dispatcher can have one cache of all commands that are being handled, and if there a command that refers to the same aggregate (account) it throws an Exception.

  3. The Repository can Create a new instance of the aggregate and run all events from event store just before saving to check if the version is still the same as the one fetched in step 7.

Problems with each solution:

  1. Front End

    • The user can bypass this constrains by editing some javascript.
    • If there are multiple sessions opened (e.g. different browsers), there would be necessary some static field holding reference to all opened sessions. and it would be necessary to lock some static variable to access this field.
    • If there are multiple Servers for the specific action being performed (horizontal scaling), this static field would not work, because it would be necessary to share this across all servers. So, some layer would be necessary (e.g. Redis).
  2. Command Cache

    • For this solution to work, it would be necessary to lock some static variable of the cache of commands when reading and when writing to it.

    • If there are multiple Servers for the specific use case of the application layer being performed (horizontal scaling), this static cache would not work, because it would be necessary to share this across all servers. So, some layer would be necessary (e.g. Redis).

  3. Repository Version check

    • For this solution to work, it would be necessary to lock some static variable before doing the check (version of Database equals the version fetched in step 7) and saving.

    • If the system was distributed (horizontal scale), it would be necessary to lock the event store. Because, otherwise, both process could pass the check (version of Database equals the version fetched in step 7) and then one saves and then the other saves. And depending on the technology, it is not possible to lock the event store. So, there would be another layer to serialize every access to the event store and add the possibility to lock the store.

This solutions that lock a static variable are somewhat OK, because they are local variables and very fast. However, depending on something like Redis adds some large latencies. And even more if we talk about locking the access to databases (event store). And even more, if this has to be done through one other service.

I would like to know if there is any other possible solution to handle this problem, because this is a major problem (corruption on the event store) and if there is no way around it the whole concept seems to be flawed.

I'm open to any change in the architecture. If for instance, one solution is to add one event Bus, so that everything gets funneled through it, it's fine, but I can't see this solving the problem.

Other point that I'm not familiar with is Kafka. I don’t know if there is some solution that Kafka provides for this problem.

like image 867
Rodrigo Riskalla Leal Avatar asked Jan 29 '23 10:01

Rodrigo Riskalla Leal


2 Answers

Although all the solutions that you provided could work in some specific scenarios, I think the last solution (3.2) works for the more general use case. I use it in my open source framework and it works very well.

So, the Event store is the one responsible with ensuring that an Aggregate is not mutated at the same time by two commands.

One way of doing it is to use optimistic locking. When the Aggregate is loaded from the Event store, you remember its version. When you persist the events, you try to append them with the version + 1. You must have an unique index per AggregateType-AggregateId-version. If the append fails you should retry the whole process (load+handle+append).

I think this is the most scalable solution, as it works even with sharding, when the sharding key is a sub set of AggregateId.

You could easily use MongoDB as an EventStore. In MongoDB <= 3.6 you could append all the events atomically by inserting a single document with a nested document containing the array of events.

Another solution is to use pessimistic locking. You start a transaction before you load the Aggregate, append the events, increase its version and commit. You need to use 2 tables/collections, one for the Aggregate metadata+version and one for the actual events. MongoDB >= 4.0 has transactions.

In both these solutions, the Event store does not get corrupted.

Other point that I'm not familiar with is Kafka. I don’t know if there is some solution that Kafka provides for this problem.

You can use Kafka with Event sourcing but you need to change your architecture. See this answer.

like image 173
Constantin Galbenu Avatar answered Feb 04 '23 21:02

Constantin Galbenu


Short answer: atomic transactions are still a thing.

Longer answer: to handle concurrent writes correctly, you either need a lock, or you need conditional writes (aka compare and swap).

Using a log: we would need to acquire the lock before step 6, and release the lock after step 12.

Using a conditional write: at step 6, the repository would capture a concurrency predicate (which could be implicit -- for instance, the count of events read). When performing the write at step 12, the concurrency predicate would be checked to ensure that there had been no concurrent modifications.

For example, the HTTP API for Event Store uses the ES-ExpectedVersion; the client is responsible for computing (from the events that it has fetched) where it expects the write to occur.

Gabriel Schenker describes both an RDBMS repository and an event store repository in his 2015 essay Event Sourcing applied -- the Repository.

Of course, with the introduction of the conditional write, you should give thought to what you want the model to do when the write fails. You might introduce a retry strategy (go to step 6), or try a merge strategy, or simply fail and return to sender.

In your example of conditional write, I assume that there would be necessary to add a Lock in step 11 (so that it locks the event store to fetch the concurrency predicate). And release the lock only after writing the new events to the Event Store. Otherwise, two concurrent processes could pass the concurrency predicate check and save the events.

Not necessarily.

If your persistence store provides locks, but not conditional writes, then you have the right idea: in step 12, the repository would acquire a lock, check the precondition, commit the new events, and release the lock.

But a persistence appliance that understands conditional writes can implement that check for you. Using event store, the repository doesn't need to acquire a lock. It sends the events with metadata about the expected state to the store. The event store itself uses that information to perform the conditional write.

There's no magic - somebody needs to do the work to ensure that concurrent writes don't clobber each other. But it doesn't necessarily have to be in your code.

Note that I'm using "Repository" as described by Eric Evans in the blue book - it's the abstraction that hides your choice of how to store events from the rest of the system; in other words, it's the adapter that makes your event store look like an in memory collection of events -- it's not the event store itself.

like image 42
VoiceOfUnreason Avatar answered Feb 04 '23 23:02

VoiceOfUnreason