Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write to two Kafka topics in a single transaction using Spring Kafka

I'm trying to work out if there's a way of using Kafka's transaction feature to write to two topics within a transaction.

I know the typical scenario to use Kafka's transactions is in a consumer-producer pattern and that seems well documented.

What I've tried:

  1. created a KafkaTransactionManager per topic
  2. configured each ProducerFactory to use their respective transaction manager
  3. Created a ChainedTransactionManger with the two instances of KafkaTransactionManager
  4. Created a KafkaTemplate per topic

    I then used the @Transactional(transactionManager = "chainedTx") annotation on a method that that does:

    template1.send("topic1", "example payload");
    template2.send("topic2", "example payload");
    

This doesn't work. The KafkaTemplate is transactional, but when the send() method is called, there is no transaction in progress and I get an IllegalStateException.

I was going to try the KafkaTemplate.executeInTransaction() method, but the Javadoc states this is only for local transactions, so it does not appear to fit my needs.

My next step is to try using Kafka's Producer API directly to see if this pattern works, but I'd appreciate it if someone can tell me know that I'm wasting my time and Kafka doesn't support transactionally writing to multiple topics.

I did find this statement in Confluent's blog on Kafka transaction support:

Transactions enable atomic writes to multiple Kafka topics and partitions...

But I haven't found any examples that demonstrate it.

Configuration of the first producer

@Configuration public class ControlProducerConfig {

@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return  new KafkaTransactionManager<>(factory());
}

@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

Configuration of the second producer

@Configuration
public class PayloadProducerConfig {


@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(factory());
}

@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

Main class

@EnableTransactionManagement
@SpringBootApplication
public class App {

public static void main(String[] args) {
    SpringApplication.run(App.class, args);
}

@Bean("chainedTx")
public ChainedTransactionManager chained(
    @Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
    @Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {

    return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}

@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
    return new OnStart(postTwoMessages);
}

@Bean
public PostTwoMessages postTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {

    return new PostTwoMessages(controlTemplate, payloadTemplate);
}

}

On application start

public class OnStart implements ApplicationListener<ApplicationReadyEvent> {

private PostTwoMessages postTwoMessages;

public OnStart(PostTwoMessages postTwoMessages) {
    this.postTwoMessages = postTwoMessages;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    postTwoMessages.run();
}

}

Posting the two messages

public class PostTwoMessages  {

private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;

public PostTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {

    this.controlTemplate = controlTemplate;
    this.payloadTemplate = payloadTemplate;
}

@Transactional(transactionManager = "chainedTx")
public void run() {
    UUID uuid = UUID.randomUUID();
    controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
    payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}

}

like image 600
SteveD Avatar asked Feb 25 '19 12:02

SteveD


1 Answers

It should work; do you have @EnableTransactionManagement?

However, transactions can't span 2 different producers; you have to do both sends using the same template. Otherwise it's 2 different transactions.

EDIT

Here's an example with a Spring Boot application:

EDIT2

Update example to show using a local transaction via executeInTransaction.

@SpringBootApplication
public class So54865968Application {

    public static void main(String[] args) {
        SpringApplication.run(So54865968Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.runInTx();
            System.out.println("Committed 1");
            foo.runInLocalTx();
            System.out.println("Committed 2");
        };
    }

    @Bean
    public Foo foo(KafkaTemplate<String, Object> template) {
        return new Foo(template);
    }

    @Bean
    public Bar bar() {
        return new Bar();
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54865968-1", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54865968-2", 1, (short) 1);
    }

    public static class Foo {

        private final KafkaTemplate<String, Object> template;

        public Foo(KafkaTemplate<String, Object> template) {
            this.template = template;
        }

        @Transactional(transactionManager = "kafkaTransactionManager")
        public void runInTx() throws InterruptedException {
            this.template.send("so54865968-1", 42);
            this.template.send("so54865968-2", "texttest");
            System.out.println("Sent 2; waiting a few seconds to commit");
            Thread.sleep(5_000);
        }

        public void runInLocalTx() throws InterruptedException {
            this.template.executeInTransaction(t -> {
                t.send("so54865968-1", 43);
                t.send("so54865968-2", "texttest2");
                System.out.println("Sent 2; waiting a few seconds to commit");
                try {
                    Thread.sleep(5_000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return true;
            });
        }

    }

    public static class Bar {

        @KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
        public void haandler(byte[] bytes) {
            if (bytes.length == 4) {
                ByteBuffer bb = ByteBuffer.wrap(bytes);
                System.out.println("Received int " + bb.getInt());
            }
            else {
                System.out.println("Received string " + new String(bytes));
            }
        }

    }

}

and

spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

and

public class CompositeSerializer implements Serializer<Object> {

    private final StringSerializer stringSerializer = new StringSerializer();

    private final IntegerSerializer intSerializer = new IntegerSerializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
                : stringSerializer.serialize(topic, (String) data);
    }

    @Override
    public void close() {
    }

}

and

Received int 42
Received string texttest

Both showed up after the 5 second pause.

like image 71
Gary Russell Avatar answered Oct 02 '22 13:10

Gary Russell