Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CQRS read side, multiple event stream topics, concurrency / race conditions

I'm having an issue with (re)applying events from multiple topics in the correct order on the read / query side.

Example:

On the write / command side, we have 2 Aggregates with an n:m relationship:

  • Contact
  • Group

Those Aggregates produce the following events on 2 separate event stream topics (because best practices says: One topic per aggregate. And I totally agree):

  • Contact Topic:

    1. ContactCreated (contactId: "123", name: "Peter")
    2. ContactAddedToGroup (contactId: "123", groupId: "456")
  • Group Topic:

    1. GroupCreated (groupId: "456", name: "Customers")

On the read / query side (e.g. Elasticsearch) I'd like to execute this query:

  • Find all Contacts that belong to any Group which name begins with Custo...
  • Find all Groups which name begins with Custo... (this shouldn't be a problem at all)

To achieve this, there are 2 read models. Example Data:

  • {contactId: "123", name: "Peter", groups: [{id: "456", name: "Customers"}]}
  • {groupId: "456", name: "Customers"}

The Problem:

The order of events can only be guaranteed for a single Event Topic (like in Apache Kafka). Though the 3 Events can be consumed by the read / query side in multiple ways: 1,2,3 or 1,3,2 or 3,1,2

How to handle 1,2,3? Database pseudo statements example:

  1. INSERT Contact (contactId: "123", name: "Peter")
    • FIND Group WHERE (groupId: "456") (doesn't work, because Group wasn't inserted yet)
    • UPDATE Contact WHERE (contactId: "123") ADD Group (groupId: "456", name: "???") (here's the problem)
  2. INSERT Group (groupId: "456", name: "Customers")

Idea(s):

  • I could extend the algorithm and append one more statement. This will lookup all Contacts that have been added to the Group, and add the Group name to those (to make the search query work):

    1. UPDATE Contact WHERE (groupId: "456") REPLACE Group (groupId: "456", name: "Customers")
  • Another idea (I don't like) could be to only use a single event stream topic. Then the order of events will always be correct. But there will be cases where this won't be easily possible. (Also best practices tell, that one should use one topic per aggregate)

  • Ignore the problem, because it's pretty unlikely to happen, because the User will provide the necessary delay between Create Group and Add Contact To Group. But when it comes to Event Replay there's no delay, and Event Topics can be consumed in parallel / 'random' order.

Question(s):

This scenario should be fairly common. But unfortunately there are very few real world CQRS examples on the web. And most of them won't explain the small / hidden pitfalls.

How do you solve those problems?

like image 518
Benjamin M Avatar asked Nov 25 '17 05:11

Benjamin M


1 Answers

In your example, you are guaranteed that the GroupCreated event (3) has been added before ContactAddedToGroup (2), since obviously the user cannot add a contact to a group before the group has been created. So the GroupCreated event will be available to be read, even if you happen to read the ContactAddedToGroup event first.

Sticking with the 2 separate streams (which is definitely correct, since Groups and Contacts are separate aggregates), this is one approach:

  • The contact can maintain its own table of group names (just id and name columns are needed in your example). Or if you're happy to couple Groups and Contacts (they sound like the same bounded context), you can just have the single event handler dealing with both Group and Contacts projections.
  • The projection handler subscribes to both Group and Contact events (from a single process and thread).
  • If it reads a contact added message for a group that it doesn't have in its group names table, it immediately performs a catch-up on the group events (or at least catches up far enough that it does get that group) and then processes the contact event again.

This approach will work during replay as well as during live processing. During replay, you can also choose just to consume parent streams completely (e.g. Group) before you even start consuming the projection's main stream (Contact in this case), though you still need to be prepared to catch up again on the Group stream if necessary, since new events may be coming in during catch up.

The single thread also ensures there is no race condition if you have GroupRenamed events - you can be sure you will rename the column in all contacts, whereas with multiple threads you might have a race between inserting a contact with the old group name and the query that updates all group names in contacts that use that group. If you need crazy scale, you'll have to shard your contacts, and have each shard maintain its own group name table, to avoid race conditions.

The other approach is to decide that Group Name is allowed to be null, and just do the contacts update when you read the event (your first idea). So you'll treat new groups and group renames (if allowed) in much the same way, but your clients will need to deal with temporarily null group names in contacts, which may be an unwelcome complication.

like image 53
TomW Avatar answered Nov 06 '22 16:11

TomW