I'm building a Java 8 application that queries a Kafka topic for exactly one message. Each request creates a new Consumer
object (independent from any existing Consumer
objects) which polls my Kafka topic, gets one record, and the Consumer
is closed. This happens ~200k times per day and each request is independent of all others, so I don't think I can reuse consumers. Basically, a user requests a message from the topic and a consumer is created for them, then closed. This happens on average ~2 times per second, but is arbitrary, so it can happen 10 times/s or 1 time/hour, there's no way to know.
After a while, the heap size on the Kafka server (not the server running the code, but the actual server running Kafka) gets huge and garbage collection can't clear it. Eventually, more CPU time is dedicated to GC than anything else, and everything crashes until I restart Kafka.
Here's an approximate version of the code that's causing the issue, with a while(true)
approximating real behaviour (in production, the consumers are not created in a while loop, but are created on-demand when a user requests a message from the topic):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen
/* CODE TO GET ONE RECORD */
consumer.unsubscribe();
consumer.close();
}
Running this code on 20 JVMs leads to a memory leak in about 20min. Here's what the heap (blue) and GC pause time (green) on the Kafka server look like:
Am I doing something wrong (or is there a better way to approach this), or is this a bug in Kafka when a lot of consumers are created and closed?
I'm running Kafka 0.10.2.1 on the client side and Kafka 0.10.2.0 on the server.
Regardless of the number and frequency of requests you receive, you can still reuse KafkaConsumer instances. You can only poll when a request arrives, but you don't need to create and close a consumer each time.
Having said that, your usage of the consumers might have revealed a memory management issue on the broker if memory usage increases and is not reclaimed by GCs. I have seen issues reporting the broker runs out of direct memory when the producers are recycled very frequently. So it's likely there is scope for improvement there. Probably best to raise a ticket at issues.apache.org to have it looked at.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With