Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do sagas (aka, process managers) contain an internal state and why are they persisted to the event store?

A lot of articles on CQRS imply that sagas have an internal state and must be saved to the event store. I don't see why this is necessary.

For example, say I have three aggregates: Order, Invoice and Shipment. When a customer places an order, the order process starts. However, the shipment cannot be sent until the invoice has been paid and the shipment has first been prepared.

  1. A customer places an order with the PlaceOrder command.
  2. The OrderCommandHandler calls OrderRepository::placeOrder().
  3. The OrderRepository::placeOrder() method returns an OrderPlaced event, which is stored in the EventStore and sent along the EventBus.
  4. The OrderPlaced event contains the orderId and pre-allocates a invoiceId and shipmentId.
  5. The OrderProcess ("saga") receives the OrderPlaced event, creating the invoice and preparing the shipment if necessary (achieving idempotence in the event handler). 6a. At some point in time, the OrderProcess receives the InvoicePaid event. It checks to see whether the shipment has been prepared by looking up the shipment in the ShipmentRepository, and if so, sends the shipment. 6b. At some point in time, the OrderProcess receives the ShipmentPrepared event. It chekcs to see whether the invoice has been paid by looking up the invoice in the InvoiceRepository, and if so, sends the shipment.

To all the experienced DDD/CQRS/ES gurus out there, can you please tell me what concept I'm missing and why this design of a "stateless saga" will not work?

class OrderCommandHandler {
    public function handle(PlaceOrder $command) {
        $event = $this->orderRepository->placeOrder($command->orderId, $command->customerId, ...);
        $this->eventStore->store($event);
        $this->eventBus->emit($event);
    }
}

class OrderRepository {
    public function placeOrder($orderId, $customerId, ...) {
        $invoiceId = randomString();
        $shipmentId = randomString();
        return new OrderPlaced($orderId, $customerId, $invoiceId, $shipmentId);
    }
}

class InvoiceRepository {
    public function createInvoice($invoiceId, $customerId, ...) {
        // Etc.
        return new InvoiceCreated($invoiceId, $customerId, ...);
    }
}

class ShipmentRepository {
    public function prepareShipment($shipmentId, $customerId, ...) {
        // Etc.
        return new ShipmentPrepared($shipmentId, $customerId, ...);
    }
}

class OrderProcess {
    public function onOrderPlaced(OrderPlaced $event) {
        if (!$this->invoiceRepository->hasInvoice($event->invoiceId)) {
            $invoiceEvent = $this->invoiceRepository->createInvoice($event->invoiceId, $event->customerId, $event->invoiceId, ...);
            $this->eventStore->store($invoiceEvent);
            $this->eventBus->emit($invoiceEvent);
        }

        if (!$this->shipmentRepository->hasShipment($event->shipmentId)) {
            $shipmentEvent = $this->shipmentRepository->prepareShipment($event->shipmentId, $event->customerId, ...);
            $this->eventStore->store($shipmentEvent);
            $this->eventBus->emit($shipmentEvent);
        }
    }

    public function onInvoicePaid(InvoicePaid $event) {
        $order = $this->orderRepository->getOrders($event->orderId);
        $shipment = $this->shipmentRepository->getShipment($order->shipmentId);
        if ($shipment && $shipment->isPrepared()) {
            $this->sendShipment($shipment);
        }
    }

    public function onShipmentPrepared(ShipmentPrepared $event) {
        $order = $this->orderRepository->getOrders($event->orderId);
        $invoice = $this->invoiceRepository->getInvoice($order->invoiceId);
        if ($invoice && $invoice->isPaid()) {
            $this->sendShipment($this->shipmentRepository->getShipment($order->shipmentId));
        }
    }

    private function sendShipment(Shipment $shipment) {
        $shipmentEvent = $shipment->send();
        $this->eventStore->store($shipmentEvent);
        $this->eventBus->emit($shipmentEvent);
    }
}
like image 588
magnus Avatar asked Dec 16 '15 05:12

magnus


2 Answers

Commands can fail.

That's the primary problem; the entire reason we have aggregates in the first place, is so that they can protect the business from invalid state changes. So what happens in onOrderPlaced() if the createInvoice command fails?

Furthermore (though somewhat related) you are lost in time. Process managers handle events; events are things that have already happened in the past. Ergo -- process managers are running in the past. In a very real sense, they can't even talk to anyone that has seen a more recent event than the one that they are processing right now (in fact, they might be the first handler to see this event, meaning everybody else is a step in the past).

This is why you can't run commands synchronously; your event handler is in the past, and the aggregate can't protect its invariant unless it is running in the present. You need the asynchronous dispatch to get the command running against the correct version of the aggregate.

Next problem: when you dispatch the command asynchronously, you can't directly observe the result. It might fail, or get lost en route, and the event handler won't know. The only way that it can determine that the command succeeded is by observing a generated event.

A consequence is that the process manager cannot distinguish a command that failed from a command that succeeded (but the event hasn't become visible yet). To support a finite sla, you need a timing service that wakes up the process manager from time to time to check on things.

When the process manager wakes up, it needs state to know if it has already finished the work.

With state, everything is so much simpler to manage. The process manager ccan re-issue possibly lost commands to be sure that they get through, without also flooding the domain with commands that have already succeeded. You can model the clock without throwing clock events into the domain itself.

like image 134
VoiceOfUnreason Avatar answered Oct 07 '22 15:10

VoiceOfUnreason


What you are referring to seems to be along the lines of orchestration (with a process manager) vs choreography.

Choreography works absolutely fine but you will not have a process manager as a first-class citizen. Each command handler will determine what to do. Even my current project (December 2015) uses choreography quite a bit with a webMethods integration broker. Messages may even carry some of the state along with them. However, when anything needs to take place in parallel your are rather shafted.

A relevant service orchestration vs choreography question demonstrates these concepts quite nicely. One of the answers contains a nice pictorial representation and, as stated in the answer, more complex interactions typically require state for the process.

I find that you typically will require state when interacting with services and endpoints beyond your control. Human interaction, such as authorizations, also require this type of state.

If you can get away with not having state specifically for a process manager it may be OK. However, later on you may run into issues. For example, some low-level/core/infrastructure service may span across various processes. This may cause issues in a choreography scenario.

like image 34
Eben Roux Avatar answered Oct 07 '22 17:10

Eben Roux