I am trying to write a kafka consumer application in java on Springboot platform. Earlier, I have written code in plain java but now converting into spring-kafka as it can give some advantage over plain java. I do have few questions that I am trying to understand.
It seems that I don't have to explicitly poll() loop in spring-kafka and it would be handled automatically by @KafkaListener?
I have set enable.auto.commit='false', As I have to do some processing before committing offsets, how can I perform commitAsync() in Spring-Kafka?
ConsumerConfig.java :
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${app.kafka_brokers}")
private String KAFKA_BROKERS;
@Value("${app.topic}")
private String KAFKA_TOPIC;
@Value("${app.group_id_config}")
private String GROUP_ID_CONFIG;
@Value("${app.schema_registry_url}")
private String SCHEMA_REGISTRY_URL;
@Value("${app.offset_reset}")
private String OFFSET_RESET;
@Value("${app.max_poll_records}")
private String MAX_POLL_RECORDS;
@Value("${app.security.protocol}")
private String SSL_PROTOCOL;
@Value("${app.ssl.truststore.password}")
private String SSL_TRUSTSTORE_SECURE;
@Value("${app.ssl.keystore.password}")
private String SSL_KEYSTORE_SECURE;
@Value("${app.ssl.key.password}")
private String SSL_KEY_SECURE;
@Value("${app.ssl.truststore.location}")
private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
@Value("${app.ssl.keystore.location}")
private String SSL_KEYSTORE_LOCATION_FILE_NAME;
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
KafkaConsumer.java :
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}
First of all, I suggest you use the properties and AutoConfiguration set by Spring kafka instead of creating your own as it follows the DRY Principle: Don't Repeat Yourself.
spring:
kafka:
bootstrap-servers: ${app.kafka_brokers}
consumer:
auto-offset-reset: ${app.offset_reset}
enable-auto-commit: false // <---- disable auto committing
ssl:
protocol: ${app.security.protocol}
key-store-location: ${app.ssl.keystore.location}
key-store-password: ${app.ssl.keystore.password}
trust-store-location: ${app.ssl.truststore.location}
trust-store-password: ${app.ssl.truststore.password}
// And other properties
listener:
ack-mode: manual // This is what you need
The AckMode docs: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html
Essentially, manual is an asynchronous acknowledgment, while manual_immediate is synchronous.
Then inside your @KafkaListener component you can inject org.springframework.kafka.support.Acknowledgment object acknowledge your message.
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
System.out.println(record);
acknowledgment.acknowledge();
}
}
Here's the documentation for what can be injected into a @KafkaListener method: https://docs.spring.io/spring-kafka/reference/html/#message-listeners
The listener container will commit the offset when the listener exits normally, depending on the container's AckMode property; AckMode.BATCH (default) means the offsets for all the records returned by the poll will be committed after they have all been processed, AckMode.RECORD means each offset will be committed as soon as the listener exits.
sync Vs. async is controlled by the syncCommits container property.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With