I wrote a java program to consume messsage from kafka. I want to monitor the consume lag, how to get it by java?
BTW, I use:
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
Thanks in advance.
Monitor Kafka consumersFrom the Header Bar Menu, go to the Dashboard panel. On the side navigation, select Consumers under the Monitor section.
The simplest way to check the offsets and lag of a given consumer group is by using the CLI tools provided with Kafka. In the diagram above, you can see the details on a consumer group called my-group . The command output shows the details per partition within the topic.
This means the amount of time it takes for a record that is produced to Kafka to be fetched by the consumer is short. If you're using a dedicated cluster, adding additional CKUs can reduce latency.
You can set the SetStatisticsHandler callback function when creating the consumer. For example, the c# code is as follows
var config = new ConsumerConfig()
{
BootstrapServers = entrypoints,
GroupId = groupid,
EnableAutoCommit = false,
StatisticsIntervalMs=1000 // statistics interval time
};
var consumer = new ConsumerBuilder<Ignore, byte[]>( config )
.SetStatisticsHandler((consumer,json)=> {
logger.LogInformation( json ); // statistics metrics, include consumer lag
} )
.Build();
For details, please refer to statistics metrics in STATISTICS.md.
In case if you don't want to include kafka (and scala) dependencies to your project you can use class below. It uses only kafka-clients dependencies.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
public class KafkaConsumerMonitor {
public static class PartionOffsets {
private long endOffset;
private long currentOffset;
private int partion;
private String topic;
public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) {
this.endOffset = endOffset;
this.currentOffset = currentOffset;
this.partion = partion;
this.topic = topic;
}
public long getEndOffset() {
return endOffset;
}
public long getCurrentOffset() {
return currentOffset;
}
public int getPartion() {
return partion;
}
public String getTopic() {
return topic;
}
}
private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString();
public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) {
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);
KafkaConsumer consumer = createNewConsumer(groupId, host);
BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
throw new IllegalStateException();
};
Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> (entry.getKey()),
entry -> {
OffsetAndMetadata committed = consumer.committed(entry.getKey());
return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey().partition(), topic);
}, mergeFunction));
return result;
}
public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) {
Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>();
KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host);
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition)));
consumer.close();
return endOffsets;
}
private static KafkaConsumer<?, ?> createNewConsumer(String groupId, String host) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new KafkaConsumer<>(properties);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With