Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CQRS: project out-of-order notifications in an ElasticSearch read model

We have a microservice architecture and apply the CQRS pattern. A command sent to a microservice triggers an application state change and the emission of the corresponding event on our Kafka bus. We project these events in a read model built with ElasticSearch.

So far, so good.

Our microservices are eventually consistent with each other. But at any given time, they aren't (necessarily). Consequently, the events they send are not always consistent with each other either.

Moreover, to guarantee the coherence between an application state change and the emission of the corresponding event, we persist in DB the new state and the corresponding event in the same transaction (I am aware that we could use event sourcing and avoid persisting the state altogether). An asynchronous worker is then responsible to send these events on the Kafka bus. This pattern guarantees that at least one event will be sent for each state change (which is not an issue since our events are idempotent). However, since each microservice has its own event table and asynchronous worker, we cannot guarantee that events will be sent in the sequence in which the corresponding state changes occurred in their respective microservices.

EDIT: to clarify, each microservice has its own database, its own event table and its own worker. A specific worker processes the events in the order in which they were persisted in its corresponding event table, but different workers on different event tables, i.e. for distinct microservices, do not give such guarantee.

The problem arises when projecting these incoherent or out-of-sequence events from different microservices in the same ElasticSearch document.

A concrete example: let's imagine three different aggregates A, B and C (aggregate in the Domain Driven Design sense) managed by different microservices:

  • There is a many-to-many relation between A and B. Aggregate A references the aggregate roots B he is bound to, but B is unaware of its relationships with A. When B is deleted, the microservice managing A listens for the corresponding event and undoes the binding of A with B.
  • Similarily, there is a many-to-many relation between B and C. B knows of all related C aggregates, but the inverse is not true. When C is deleted, the microservice managing B listens for the corresponding event and undoes the binding of B with C.
  • C has a property "name".

One of the use cases is to find, through ElasticSearch, all aggregates A that are bound to an aggregate B that is in turn bound to an aggregate C with a specific name.

As explained above, the separate event tables and workers could introduce variable delays between the emission of events from different microservices. Creating A, B and C and binding them together could for example result in the following sequence of events:

  1. B created
  2. B bound to C
  3. C created with name XYZ
  4. A created
  5. A bound to B

Another example of batch of events: let's suppose we initially have aggregates B and C and two commands are issued simultaneously:

  • delete C
  • bind B to C

this could result in the events:

  1. C deleted
  2. B bound to C
  3. B unbound from C (in response to event 1)

Concretely, we have trouble projecting these events in ElasticSearch document(s) because the events sometimes reference aggregates that do not exist anymore or do not exist yet. Any help would be appreciated.

like image 972
Odsh Avatar asked Nov 27 '17 17:11

Odsh


1 Answers

I don't think the problem you raise is exclusive to the projection part of your system - it can also happen between microservices A, B and C.

Normally, the projector gets C created at the same time as B does. Only then can B bind itself to C, which makes it impossible for the specific order you mentioned to happen to the projector.

However, you're right to say that the messages could arrive in the wrong order if for instance, the network communication between B and C is considerably faster than between C and the projector.

I've never come across such a problem, but a few options come to mind :

  • Don't enforce "foreign keys" at the read model level. Store B with its C reference even if you know very little about C for now. In other words, make B bound to C and C created commutative.

  • Add a causation ID to your events. This allows a client to recognize and deal with out of order messages. You can choose your own policy - reject, wait for causation event to arrive, try to process anyway, etc. That is not trivial to implement, though.

  • Messaging platforms can guarantee ordering under certain conditions. You mentioned Kafka, under the same topic and partition. RabbitMQ, I think, has even stronger prerequisites.

    I'm not a messaging expert but it looks like the inter-microservice communication scenarios where it would be feasible are limited though. It also seems to go against the current trend in eventual consistency, where we tend to favor commutative operations (see CRDTs) over ensuring total order.

like image 120
guillaume31 Avatar answered Oct 28 '22 05:10

guillaume31