Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service

I am trying to implement an event driven architecture to handle distributed transactions. Each service has its own database and uses Kafka to send messages to inform other microservices about the operations.

An example:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

Order receives an order request. It has to store the new Order in its DB and publish a message so that Payment Service realizes it has to charge for the item:

private OrderBusiness orderBusiness;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

These are my doubts:

  1. Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?
  2. This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?
like image 845
codependent Avatar asked Feb 09 '17 15:02

codependent


1 Answers

Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?

Kafka currently does not support transactions (and thus also no rollback or commit), which you'd need to synchronize something like this. So in short: you can't do what you want to do. This will change in the near-ish future, when KIP-98 is merged, but that might take some time yet. Also, even with transactions in Kafka, an atomic transaction across two systems is a very hard thing to do, everything that follows will only be improved upon by transactional support in Kafka, it will still not entirely solve your issue. For that you would need to look into implementing some form of two phase commit across your systems.

You can get somewhat close by configuring producer properties, but in the end you will have to chose between at least once or at most once for one of your systems (MariaDB or Kafka).

Let's start with what you can do in Kafka do ensure delivery of a message and further down we'll dive into your options for the overall process flow and what the consequences are.

Guaranteed delivery

You can configure how many brokers have to confirm receipt of your messages, before the request is returned to you with the parameter acks: by setting this to all you tell the broker to wait until all replicas have acknowledged your message before returning an answer to you. This is still no 100% guarantee that your message will not be lost, since it has only been written to the page cache yet and there are theoretical scenarios with a broker failing before it is persisted to disc, where the message might still be lost. But this is as good a guarantee as you are going to get. You can further reduce the risk of data loss by lowering the intervall at which brokers force an fsync to disc (emphasized text and/or flush.ms) but please be aware, that these values can bring with them heavy performance penalties.

In addition to these settings you will need to wait for your Kafka producer to return the response for your request to you and check whether an exception occurred. This sort of ties into the second part of your question, so I will go into that further down. If the response is clean, you can be as sure as possible that your data got to Kafka and start worrying about MariaDB.

Everything we have covered so far only addresses how to ensure that Kafka got your messages, but you also need to write data into MariaDB, and this can fail as well, which would make it necessary to recall a message you potentially already sent to Kafka - and this you can't do.

So basically you need to choose one system in which you are better able to deal with duplicates/missing values (depending on whether or not you resend partial failures) and that will influence the order you do things in.

Option 1

Kafka first

In this option you initialize a transaction in MariaDB, then send the message to Kafka, wait for a response and if the send was successful you commit the transaction in MariaDB. Should sending to Kafka fail, you can rollback your transaction in MariaDB and everything is dandy. If however, sending to Kafka is successful and your commit to MariaDB fails for some reason, then there is no way of getting back the message from Kafka. So you will either be missing a message in MariaDB or have a duplicate message in Kafka, if you resend everything later on.

Option 2

MariaDB first

This is pretty much just the other way around, but you are probably better able to delete a message that was written in MariaDB, depending on your data model.

Of course you can mitigate both approaches by keeping track of failed sends and retrying just these later on, but all of that is more of a bandaid on the bigger issue.

Personally I'd go with approach 1, since the chance of a commit failing should be somewhat smaller than the send itself and implement some sort of dupe check on the other side of Kafka.


This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?

Now first of, I'll admit I am unfamiliar with Spring, so this may not be of use to you, but the following code snippet illustrates one way of checking produce responses for exceptions. By calling flush you block until all sends have finished (and either failed or succeeded) and then check the results.

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
like image 82
Sönke Liebau Avatar answered Sep 18 '22 07:09

Sönke Liebau