Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling out of order events in CQRS read side

I've read this nice post from Jonathan Oliver about handling out of order events.

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

The solution that we use is to dequeue a message and to place it in a “holding table” until all messages with a previous sequence are received. When all previous messages have been received we take all messages out of the holding table and run them in sequence through the appropriate handlers. Once all handlers have been executed successfully, we remove the messages from the holding table and commit the updates to the read models.

This works for us because the domain publishes events and marks them with the appropriate sequence number. Without this, the solution below would be much more difficult—if not impossible.

This solution is using a relational database as a persistence storage mechanism, but we’re not using any of the relational aspects of the storage engine. At the same time, there’s a caveat in all of this. If message 2, 3, and 4 arrive but message 1 never does, we don’t apply any of them. The scenario should only happen if there’s an error processing message 1 or if message 1 somehow gets lost. Fortunately, it’s easy enough to correct any errors in our message handlers and re-run the messages. Or, in the case of a lost message, to re-build the read models from the event store directly.

Got a few questions particularly about how he says we can always ask the event store for missing events.

  1. Does the write side of CQRS have to expose a service for the read side to "demand" replaying of events? For example if event 1 was not received but but 2, 4, 3 have can we ask the eventstore through a service to republish events back starting from 1?
  2. Is this service the responsibility of the write side of CQRS?
  3. How do we re-build the read model using this?
like image 283
Dasith Wijes Avatar asked Nov 20 '16 11:11

Dasith Wijes


2 Answers

If you have a sequence number, then you can detect a situation where current event is out of order, e.g. currentEventNumber != lastReceivedEventNumber + 1

Once you've detected that, you just throw an exception. If your subscriber has a mechanism for 'retries' it will try to process this event again in a second or so. There is a pretty good chance that during this time earlier events will be processed and sequence will be correct. This is a solution if out-of-order events are happening rarely.

If you are facing with this situation regularly, you need to implement global locking mechanism, which will allow certain events be processed sequentially. For example, we were using sp_getapplock in MSSQL to achieve global "critical section" behaviour in certain situations. Apache ZooKeeper offers a framework to deal with even more complicated scenarios when multiple parts of the distributed application require something more than a just simple lock.

like image 137
IlliakaillI Avatar answered Oct 23 '22 10:10

IlliakaillI


Timestamp based solution:

The incoming messages are:

{
 id: 1,
 timestamp: T2,
 name: Samuel
}
{
 id: 1,
 timestamp: T1,
 name: Sam,
 age: 26
}
{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 contact: 123
}

And what we expect to see irrespective of the ORDER in the database is:

{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 age: 26,
 contact: 123
}

For every incoming message, do the following:

  1. Get the persisted record and evaluate the timestamp.
  2. Whichever's timestamp is greater that's the target.

Now let's go through the messages:

  1. T2 arrives first: Store it in the database as it's the first one.
  2. T1 arrives next: Persistent one (T2) & incoming (T1), so T2 is the target.
  3. T3 arrives: Persistent one (T2) & incoming (T1), so T3 is target.

The following deepMerge(src, target) should be able to give us the resultant:

public static JsonObject deepMerge(JsonObject source, JsonObject target) {
    for (String key: source.keySet()) {
        JsonElement srcValue = source.get(key);
        if (!target.has(key)) { // add only when target doesn't have it already
            target.add(key, srcValue);
        } else {
            // handle recursively according to the requirement

        }
    }
    return target;
}

Let me know in the comment if you need full version of deepMerge()

like image 44
Maqbool Ahmed Avatar answered Oct 23 '22 10:10

Maqbool Ahmed