Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka and exactly once delivery guarantee

I use Spring Kafka and Spring Boot and just wondering how to configure my consumer, for example:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

to use the exactly once delivery guarantee?

Should I only add org.springframework.transaction.annotation.Transactional annotation over sendPost method and that's it or do I need to perform some extra steps in order to achieve this?

UPDATED

This is my current config

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTransactionManager<Object, Object> transactionManager) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }


    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Post> postProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Post> postKafkaTemplate() {
        return new KafkaTemplate<>(postProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Message> messageProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Message> messageKafkaTemplate() {
        return new KafkaTemplate<>(messageProducerFactory());
    }

but it fails with the following error:

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of method kafkaTransactionManager in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration required a single bean, but 3 were found:
    - postProducerFactory: defined by method 'postProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - updateProducerFactory: defined by method 'updateProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - messageProducerFactory: defined by method 'messageProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]

What am I doing wrong ?

like image 305
alexanoid Avatar asked Sep 29 '18 16:09

alexanoid


People also ask

Does Kafka guarantee exactly once delivery?

This means that messages should never be lost, although the producer may duplicate work (which may also result in duplicate outputs). Exactly once delivery: The message will be delivered exactly one time. Failures and retries may occur, but the consumer is guaranteed to only receive a given message once.

How do you handle exactly once in Kafka?

This exactly once process works by committing the offsets by producers instead of consumer. i.e., the produce of result to kafka and committing the consumed messages all are done by kafka producer (instead of separate kafka consumer and producer) which brings the exactly once.

Which Kafka API supports exactly once delivery?

This problem becomes even harder when across two instances of a distributed system. Apache Kafka has supported “Exactly-once” (a.k.a. transaction) in the context of one instance or one cluster three years ago and kept iterating over that time: KIP-447 KIP-360 KIP-588.

How does Kafka guarantee at least once?

At least once An application sends a batch of messages to Kafka. The application never receives a response so sends the batch again. In this case it may have been the first batch was successfully saved, but the acknowledgement was lost, so the messages end up being added twice. 2.


1 Answers

You should not use manual acknowledgments. Instead, inject a KafkaTransactionManager into the listener container and the container will send the offset to the transaction when the listener method exits normally (or rollback otherwise).

You should not do acks via the consumer for exactly once.

EDIT

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        isolation:
          level: read_committed
    producer:
      transaction-id-prefix: myTrans.

App

@SpringBootApplication
public class So52570118Application {

    public static void main(String[] args) {
        SpringApplication.run(So52570118Application.class, args);
    }

    @Bean // override boot's auto-config to add txm
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTransactionManager<Object, Object> transactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        return factory;
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(id = "so52570118", topics = "so52570118")
    public void listen(String in) throws Exception {
        System.out.println(in);
        Thread.sleep(5_000);
        this.template.send("so52570118out", in.toUpperCase());
        System.out.println("sent");
    }

    @KafkaListener(id = "so52570118out", topics = "so52570118out")
    public void listenOut(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> this.template.executeInTransaction(t -> t.send("so52570118", "foo"));
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so52570118", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so52570118out", 1, (short) 1);
    }

}
like image 178
Gary Russell Avatar answered Oct 03 '22 04:10

Gary Russell