Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event sourcing / CQRS read model - projections

I have a microservice-based application running on AWS Lambda. Two of the microservices, the most crucial ones, use event-sourcing/cqrs.

Background: (this is also for me to organize my thoughts)

I'm using this library and storing events in DynamoDB and projections in AWS S3.

The write part works like a charm: Each command invocation loads the current state of the aggregate from DynamoDB (by running events through a handler and/or loading an cached aggregate), it decides to accept or reject the command based on some business logic, then writes to DynamoDB with KeyConditionExpression: 'aggregateId = :a AND version >= :v' where the version is a count of events processed for that aggregate. If there's a conflict, the write fails. Seems like a good system to me!

Each event is then broadcast to SNS (topic name is the service name) so other services can react to the event, if they want.

The part that I really struggle with is the read. Projections are stored in S3 and tagged with the last commitId processed for each event source. When a read query comes in, it loads the entire projected state from S3 (for all aggregates), queries the event sources for all newer events, computes the latest state (again, for all aggregates - and writing an updated object to S3 if it's newer), and returns relevant parts of the state based on the query params.

My problem: (or one of them)

I think I'm doing projections wrong.

Most of my projections only group ids by important attribute, so the files stay relatively small. But I also need a way to retrieve an individual aggregate. Using projections for that seems crazy, because I need to load the entire state each time (i.e. every projected aggregate) apply new events to that, then retrieve the record I want (it may not have even changed).

This is what I'm doing now, it's performing fine (<100k records) but I can't imagine it will continue much longer.

The other problem is queries. I need to build a projection mapping value to matching aggregateIds for every attribute I need to query on!! There's got to be a better way!

No matter what way I think about this problem, projections always need the entire current state + any new events before it can return even a single record that hasn't changed.

like image 467
joshblour Avatar asked Nov 15 '17 15:11

joshblour


Video Answer


2 Answers

I think I'm doing projections wrong.

I think so too; it sounds like you have your queries coupled to your projections

When a read query comes in, it loads the entire projected state from S3 (for all aggregates), queries the event sources for all newer events, computes the latest state

Yeah, that sounds like a mess. Or more specifically, that sounds like the query is triggering the work to be done by the projection.

If you can decouple the queries from the projections, then things get easier. The basic idea being that your queries don't describe the current state, they describe the state as of the last time the projection ran.

Same idea, different spelling: you answer queries from the documents that you cache in S3. When new events are detected, your projections run, load the new data as needed, compute the new document, and replace the entries in the cache.

I think of a triangle

  • Commands bring information from the outside to the book of record
  • Projections bring information from the book of record to the cache
  • Queries bring information from the cache to the outside world

where each leg of the triangle runs asynchronously with the others.

I suggest you work backwards from the queries - what documents do you need to support each query? what are the latency targets that you have to beat? Then you start balancing tradeoffs - for this new query, do I create the result from the existing documents, or do I need a new document built with a finer grain?

if I understand correctly, I should be triggering the projection updates as events come in, instead of in aggregate when the query is made. That saves me from querying the event store for new events on every query

Yes, and... events are only one way of triggering; you could also have the projection processes triggered by a clock (check every 15 minutes to see if we need to update) or at the whim of a human operator (hmm, it looks like your account balance is stale, let me try to update that for you). More than one way to do it, and you can mix and match strategies.

I would still need to load the entire state, both when updating the projection, and when loading a single aggregate.

Not necessarily. There's no rule that says you can't use the previously cached representation as a starting point, and then pull from the book of record only the changes that you need.

For instance, suppose you are building a view that combines aggregates A{id:7} and B{id:9}. You grab the cached copy, and look in its meta data (where you put it on your previous write) and find something inside it like metadata:{A:{id:7, version:21}, B:{id:9, version:19}}. Now you only need to load the events after the ones you used last time, update your local copy in memory, update the local copy of the metadata, and push the lot to the cache.

like image 160
VoiceOfUnreason Avatar answered Sep 28 '22 00:09

VoiceOfUnreason


I'm not familiar with your technical infrastructure but the way I implement projections is as follows:

Each domain event has a global sequence number that spans all aggregate roots. A projection is a read model that has an arbitrary name and the last processed position represented by that global sequence number. I can add a new projection at any time, along with its event handlers, and it will start at position 0. I can clear a projection at any time and set back the position to 0. I can also use a combination of adding a new projection that will replace an existing one, have that build even if it takes days, and then remove the old one.

There is a service that monitors the projections and uses the event store almost like a queue. The projection service checks for events with global ids after the current position and hands those off to handlers and then updates the position. This is where your projection may even filter on event types to improve performance.

That is the basic idea. Your projections are then what you query. Once a projection has caught up to the "head" of the event store the events from the event store are going to be trickle fed into the projection.

How that will translate into your technical space I'm not quite sure. I have a bit of an experiment called Shuttle.Recall going on C# if you'd like to have a look to get some ideas.

like image 42
Eben Roux Avatar answered Sep 27 '22 22:09

Eben Roux