Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event sourcing - event replaying

I have been reading about the Event Sourcing pattern which can be very useful if you would like to rebuild your system.

However, what if I need to run the event rebuild while servicing new incoming requests? Is there any particular pattern or best practise for that scenario?

So, instead of scheduling a system downtime, how do I ensure that the new incoming requests will not screw up my system while it is replaying because Event synchronisation and sequence is really important for my system. It involves updating the DB records which are dependent on the event sequence.

Any thoughts?

like image 964
Nyamnyam Avatar asked Nov 22 '16 13:11

Nyamnyam


2 Answers

Note: For this example, all IDs are as 6 random alphanums, think they could be UUIDs or sha1s for example.

If you have this in the events:

WriteIndex | EventId | Type             | streamId | Data    
-------------------------------------------------------------------------------
1          | qcwbf2  | car.created      | hrxs21   | { by: Alice, color: blue }
2          | e1owui  | car.repainted    | hrxs21   | { color: red }
3          | fjr4io  | car.created      | tye24p   | { by: Alice, color: blue }
4          | fhreui  | customer.created | b2dhuw   | { name: Bob }
5          | urioe7  | car.sold         | hrxs21   | { to: b2dhuw }
6          | fhreui  | customer.renamed | b2dhuw   | { name: Charlie }
-------------------------------------------------------------------------------

And this in your projections (After 6):

CarId  | Creator | Color | Sold | Customer | CustomerId
-------------------------------------------------------
hrxs21 | Alice   | red   | yes  | Bob      | b2dhuw
tye24p | Alice   | blue  | no   |          | 
-------------------------------------------------------

CustomerId | Name
---------------------
b2dhuw     | Charlie
---------------------

Iamgine you have an error in the cars projection because you did not properly listen to the "customer.renamed" into your projector of the cars.

You rewrite all the projectors and want to replay.

Your fear

You replay events and get to this:

CarId  | Creator | Color | Sold | Customer | CustomerId
-------------------------------------------------------
hrxs21 | Alice   | red   | yes  | Charlie  | b2dhuw
tye24p | Alice   | blue  | no   |          | 
-------------------------------------------------------

But in parallel, while "rebuilding the car cache" (the projections are nothing else than a cache) there are two new events coming in:

WriteIndex | EventId | Type             | streamId | Data    
-------------------------------------------------------------------------------
7          | je3i32  | car.repainted    | hrxs21   | { color: orange }
8          | 8c227x  | customer.created | wuionx   | { name: Dan }
9          | e39jc2  | car.sold         | tye24p   | { to: wuionx }

So it "seems" the new rebuilt fresh cache never "gets to the current state" as now the car of Charlie (former Bob) is now orange instead of red, a new customer has been created, and car number wuionx is now owned by Dan.

Your solution

  1. Mark your cache data with either an "index" (caution, it need to be carefully designed) or a "timestamp" (caution with corrective-events injected into past-dates!
  2. When reading your cache "make sure" you "apply" the "latest changes" pending to be applied into SYNC mode (not ASYNC).

Reason why: Rebuilding many thousands of events can take time. But rebuilding just a few dozens should be lighting fast.

Quick-n-Dirty solution

So... make your "replayer" to have these tables instead: (I'm assuming the WriteIndex is reliable for inspiration, but in practice I'd probably use another thing other than the writeIndex, it's just for illsutration):

CarId  | Creator | Color | Sold | Customer | CustomerId | LatestChangedBy
--------------------------------------------------------|----------------
hrxs21 | Alice   | red   | yes  | Charlie  | b2dhuw     | 6
tye24p | Alice   | blue  | no   |          |            | 3
-------------------------------------------------------------------------

So when going to "consume" the car tye24p you see it's latest update was done because of 3 and you can "replay" 4-end listening only on this aggregate so you'll end up with this:

CarId  | Creator | Color | Sold | Customer | CustomerId | LatestChangedBy
--------------------------------------------------------|----------------
hrxs21 | Alice   | red   | yes  | Charlie  | b2dhuw     | 6
tye24p | Alice   | blue  | yes  | Dan      | wuionx     | 9
-------------------------------------------------------------------------

This is inefficient as you see because you are replaying 4 to 6 "again" when you already replayed them.

A bit better solution

Have a global-replayer counter

CarId  | Creator | Color | Sold | Customer | CustomerId
-------------------------------------------------------
hrxs21 | Alice   | red   | yes  | Charlie  | b2dhuw
tye24p | Alice   | blue  | no   |          | 
-------------------------------------------------------

ReplayerMetaData
-------------------------------------------------------
lastReplayed: 6
-------------------------------------------------------

When you want to access anything you do a SYNC "quick update" of "any new event pending to be replayed".

If you want to access car tye24p you just see there are events up to "index 9" and you replayed up to 6. You then just "before reading" force an "update pending all" and replay just 7, 8 and 9. You end up with this car cache table:

CarId  | Creator | Color  | Sold | Customer | CustomerId
--------------------------------------------------------
hrxs21 | Alice   | orange | yes  | Charlie  | b2dhuw
tye24p | Alice   | blue   | yes  | Dan      | wuionx
-------------------------------------------------------

ReplayerMetaData
-------------------------------------------------------
lastReplayed: 9
-------------------------------------------------------

Overall

With this solution you:

  • May do an infinite number of trials before switching the read model.
  • May stay "online" while you do all the trials.
  • When your system is "ready to go" for the final rebuild (imagine it takes 1 hour to process 1 million events) you just run it BUT with the system online.
  • After the mega rebuild, say your lastReplayed = 1.000.000. If during that hour, 2.000 new events came, you just may "replay those latests ones" again.
  • Imagine those 2.000 take 5 minutes, your pointer is now 1.002.000. Imagine in those 5 minutes, 300 more events came in:
  • Replay "only those latest ones" and your pointer will be: 1.002.300
  • Once you are "nearly catched up" (no matter if there's still a 50-events-gap) you just switch the read model (by just configuration flag) to the new model => This means you must not do a full deploy, you have to have already deployed a version able to read from here or from there, so switching is "immediate".
  • After switching you just "ensure" in your reads that you "force apply the latests synchronously".
  • This will affect only the first read and should be 1 or 2 seconds at most... the next reads will most probably be already in sync so continuing by checking that gap has not any penalty in performance, It'll say just "you need to update 0 more events" and done.

Hope to help!

like image 51
Xavi Montero Avatar answered Sep 28 '22 19:09

Xavi Montero


I've used CQRS+ES in similar case. I created projection with prepared data, that i could only update, but not rebuild. And on every query i built result info from this quickly.

If you need to execute some long operations (like update in db), use sagas. Generate event -> saga -> update projection after saga ends and generate event2.

Of course, it'll be some delay between event income and projection update.

It's very interesting to learn more about you system and if such variant is good enough for you.

like image 38
A Ralkov Avatar answered Sep 28 '22 21:09

A Ralkov