I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?
Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
Using kafka-python You can use end_offsets : Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. This method does not change the current consumer position of the partitions.
Public Interfaces. The default group id will change from "" to null so that consumers that want to use an empty ( "" ) group id would have to explicitly specify it. The use of empty group id will be deprecated on the client, and consumers using this group id will receive a warning about this deprecation.
Because I didn't saw any example of this, I'm gonna explain how I did here.
The class of your @KafkaListener
must implement a ConsumerSeekAware
class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )
public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.
You can set a ConsumerRebalanceListener
for the kafka consumer while you subscribing to some topics,in which you can get the lastest offset of each partition by KafkaConsumer.endOffsets()
method, and set this to consumer by KafkaConsumer.seek()
method ,like this:
kafkaConsumer.subscribe(Collections.singletonList(topics),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//do nothing
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//get and set the lastest offset for each partiton
kafkaConsumer.endOffsets(partitions)
.forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
}
}
);
Another way, we can always consume lastest message without commit group offset , by specify properties value with {"enable.auto.commit:false", "auto.offset.reset:latest"}
for KafkaListener annotation.
@KafkaListener(id = "example-group",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
topics = "example")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With