I'm trying to work out if there's a way of using Kafka's transaction feature to write to two topics within a transaction.
I know the typical scenario to use Kafka's transactions is in a consumer-producer pattern and that seems well documented.
What I've tried:
KafkaTransactionManager
per topicProducerFactory
to use their respective transaction managerChainedTransactionManger
with the two instances of KafkaTransactionManager
Created a KafkaTemplate
per topic
I then used the @Transactional(transactionManager = "chainedTx")
annotation on a method that that does:
template1.send("topic1", "example payload");
template2.send("topic2", "example payload");
This doesn't work. The KafkaTemplate
is transactional, but when the send()
method is called, there is no transaction in progress and I get an IllegalStateException
.
I was going to try the KafkaTemplate.executeInTransaction()
method, but the Javadoc states this is only for local transactions, so it does not appear to fit my needs.
My next step is to try using Kafka's Producer API directly to see if this pattern works, but I'd appreciate it if someone can tell me know that I'm wasting my time and Kafka doesn't support transactionally writing to multiple topics.
I did find this statement in Confluent's blog on Kafka transaction support:
Transactions enable atomic writes to multiple Kafka topics and partitions...
But I haven't found any examples that demonstrate it.
Configuration of the first producer
@Configuration public class ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
Configuration of the second producer
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
Main class
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
On application start
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
Posting the two messages
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}
It should work; do you have @EnableTransactionManagement
?
However, transactions can't span 2 different producers; you have to do both sends using the same template. Otherwise it's 2 different transactions.
EDIT
Here's an example with a Spring Boot application:
EDIT2
Update example to show using a local transaction via executeInTransaction
.
@SpringBootApplication
public class So54865968Application {
public static void main(String[] args) {
SpringApplication.run(So54865968Application.class, args);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.runInTx();
System.out.println("Committed 1");
foo.runInLocalTx();
System.out.println("Committed 2");
};
}
@Bean
public Foo foo(KafkaTemplate<String, Object> template) {
return new Foo(template);
}
@Bean
public Bar bar() {
return new Bar();
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54865968-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54865968-2", 1, (short) 1);
}
public static class Foo {
private final KafkaTemplate<String, Object> template;
public Foo(KafkaTemplate<String, Object> template) {
this.template = template;
}
@Transactional(transactionManager = "kafkaTransactionManager")
public void runInTx() throws InterruptedException {
this.template.send("so54865968-1", 42);
this.template.send("so54865968-2", "texttest");
System.out.println("Sent 2; waiting a few seconds to commit");
Thread.sleep(5_000);
}
public void runInLocalTx() throws InterruptedException {
this.template.executeInTransaction(t -> {
t.send("so54865968-1", 43);
t.send("so54865968-2", "texttest2");
System.out.println("Sent 2; waiting a few seconds to commit");
try {
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
});
}
}
public static class Bar {
@KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
public void haandler(byte[] bytes) {
if (bytes.length == 4) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
System.out.println("Received int " + bb.getInt());
}
else {
System.out.println("Received string " + new String(bytes));
}
}
}
}
and
spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
and
public class CompositeSerializer implements Serializer<Object> {
private final StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer intSerializer = new IntegerSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Object data) {
return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
: stringSerializer.serialize(topic, (String) data);
}
@Override
public void close() {
}
}
and
Received int 42
Received string texttest
Both showed up after the 5 second pause.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With