Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event publisher for ASP.NET Web Api

I have started to work with micro-services and I need to create an event publishing mechanism.

I plan to use Amazon SQS.

The idea is quite simple. I store events in the database in the same transaction as aggregates. If user would change his email, event UserChangedEmail will be stored in the database.

I also have event handler, such as UserChangedEmailHandler, which will (in this case) be responsible to publish this event to SQS queue, so other services can know that user changed email.

My question is, what is the practice to achieve this? Should I have some kind of background timed process which will scan events table and publish events to SQS? Can this be process within WebApi application (preferable), or should this be a separate a process?

One of the ideas was to use Hangfire, but it does not support cron jobs under a minute.

Any suggestions?

EDIT:

As suggested in the one of the answers, I've looked in to NServicebus. One of the examples on the NServiceBus page shows core of my concern.

In their example, they create a log that order has been placed. What if log or database entry is successfully commited, but publish breaks and event never gets published?

Here's the code for the event handler:

public class PlaceOrderHandler :
    IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    IBus bus;

    public PlaceOrderHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(PlaceOrder message)
    {
        log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
        log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.Id
        };
        bus.Publish(orderPlaced); <!-- my concern
    }
}
like image 689
Robert Avatar asked Dec 14 '22 03:12

Robert


1 Answers

Off the Shelf Suggestions

Rather than rolling your own, I recommend looking into off the shelf products, as there is a lot of complexity here that will not be apparent out the outset, e.g.

  • Managing event subscriber list - an SQS queue is more appropriately paired with an event consumer, rather than with an event producer as when a message is consumed it is no longer available on the queue - so if you want to support multiple subscribers for a given event (which is a massive benefit of event driven architectures), how do you know which SQS queues you push the event message onto when it is first raised?
  • Retry semantics, error forwarding queues - handling temporary errors due to ephemeral infrastructure issues vs permanent errors due to business logic semantic issues
  • Audit trails of which messages were raised when and sent where
  • Security of messages sent via SQS (does your business case require them to be encrypted? SQS is an application service offered by Amazon which doesn't provide storage level encryption
  • Size of messages - SQS has a message size limit so you may eventually need to handle out-of-band transmission of large messages

And that's just off the top of my head...

A few off the shelf systems that would assist:

  • NServiceBus provides a framework for managing command and event messaging, and it has a plugin framework permitting flexible transport types - NServiceBus.SQS offers SQS as a transport.
    • Offers comprehensive and flexible retry, audit and error handling
    • Opinionated use of commands vs events (command messages say "Do this" and are sent to a single service for processing, event messages say "Something happened" and are sent to an arbitrary number of flexible subscribers)
    • Outbox pattern provides transactionally consistent messaging even with non-transactionally consistent transports, such as SQS
    • Currently the SQS plugin uses default NServiceBus subscriber persistence, which requires an SQL Server for storing the event subscriber list (see below for an option that leverages SNS)
    • Built in support for sagas, offering a framework to ensure multi transaction eventual consistency with rollback via compensating actions
    • Timeouts supporting scheduled message handling
    • Commercial offering, so not free, but many plugins/extensions are open source
  • Mass Transit
    • Doesn't support SQS off the shelf, but does support Azure Service Bus and RabbitMq, so could be an alternative for you if that is an option
    • Similar offering to NServiceBus, but not 100% the same - NServiceBus vs MassTransit offers a comprehensive comparison
    • Fully open source/free
  • Just Saying
    • A light-weight open source messaging framework designed specifically for SQS/SNS based
    • SNS topic per event, SQS queue per microservice, use native SNS SQS Queue subcription to achieve fanout
    • Open Source Free

There may be others, and I've most personal experience with NServiceBus, but I strongly recommend looking into the off the shelf solutions - they will free you up to start designing your system in terms of business events, rather than worrying about the mechanics of event transmission.

Even if you do want to build your own as a learning exercise, reviewing how the above work may give you some tips on what's needed for reliable event driven messaging.

Transactional Consistency and the Outbox Pattern

The question has been edited to ask about the what happens if parts of the operation succeed, but the publish operation fails. I've seen this referred to as the transactional consistency of the messaging, and it generally means that within a transaction, all business side-effects are committed, or none. Business side effects may mean:

  • Database record updated
  • Another database record deleted
  • Message published to a message queue
  • Email sent

You generally don't want an email sent or a message published, if the database operation failed, and likewise, you don't want the database operation committed if the message publish failed.

So how to ensure consistency of messaging?

NServiceBus handles this in one of two ways:

  1. Use a transactionally consistent message transport, such as MSMQ.
    1. MSMQ is able to make use of Microsoft's DTC (Distributed Transaction Coordinator) and DTC can enroll the publishing of messages in a distributed transaction with SQL server updates - this means that if your business transaction fails, your publish operation will be rolled back and visa versa
  2. The Outbox Pattern
    1. With the outbox pattern, messages are not dispatched immediately - they are added to an Outbox table in a database, ideally the same database as your business data, as part of the same transaction
    2. AFTER the transaction is committed, it attempts to dispatch each message, and only removes it from the outbox on successful dispatch
    3. In the event of a failure of the system after dispatch but before delete, the message will be transmitted a second time. To compensate for this, when Outbox is enabled, NServiceBus will also do de-duplication of inbound messages, by maintaining a record of all inbound messages and discarding duplicates.
    4. De-duplication is especially useful with Amazon SQS, as it is itself eventually consistent, and the same messages may be received twice.
    5. This is the not far from the original concept in your question, but there are differences:
      1. You were concepting a background timed process to scan the events table (aka Outbox table) and publish events to SQS
      2. NServiceBus executes handlers within a pipeline - with Outbox, the dispatch of messages to the transport (aka pushing messages into an SQS queue) is simply one of the last steps in the pipeline. So - whenever a message is handled, any outbound messages generated during the handling will be dispatched immediately after the business transaction is committed - no need for a timed scan of the events table.
    6. Note: Outbox is only successful when there is an ambient NServiceBus Handler transaction - i.e. when you are handling a message within the NServiceBus pipeline. This will NOT be the case in some contexts, e.g. a WebAPI Request pipeline. For this reason, NServiceBus recommends using your API request to send a single Command message only, and then combining business data operations with further messaging within a transactionally consistent command handler in a backend endpoint service. Although point 3 in their doc is more relevant to the MSMQ than SQS transport.

Handler Semantics

One more comment about your proposal - by convention, UserChangedEmailHandler would more commonly be associated with the service that does something in response to the email being changed, rather than simply participating in the propagation of the information that the email has changed. When you have 50 events being published by your system, do you want 50 different handlers just to push those messages onto different queues?

The systems above use a generic framework to propagate messages via the transport, so you can reserve UserChangedEmailHandler for the subscribing system and include in it the business logic that should happen whenever a user changes their email.

like image 159
Chris Simon Avatar answered Dec 21 '22 23:12

Chris Simon