Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka broker memory leak triggered by many consumers

I'm building a Java 8 application that queries a Kafka topic for exactly one message. Each request creates a new Consumer object (independent from any existing Consumer objects) which polls my Kafka topic, gets one record, and the Consumer is closed. This happens ~200k times per day and each request is independent of all others, so I don't think I can reuse consumers. Basically, a user requests a message from the topic and a consumer is created for them, then closed. This happens on average ~2 times per second, but is arbitrary, so it can happen 10 times/s or 1 time/hour, there's no way to know.

After a while, the heap size on the Kafka server (not the server running the code, but the actual server running Kafka) gets huge and garbage collection can't clear it. Eventually, more CPU time is dedicated to GC than anything else, and everything crashes until I restart Kafka.

Here's an approximate version of the code that's causing the issue, with a while(true) approximating real behaviour (in production, the consumers are not created in a while loop, but are created on-demand when a user requests a message from the topic):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition("TOPIC", 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToEnd(Arrays.asList(tp));

    // I've narrowed down the memory leak to this line
    ConsumerRecords<String, String> cr = consumer.poll(1000); 
    // If I remove this line ^, the memory leak does not happen

    /* CODE TO GET ONE RECORD */

    consumer.unsubscribe();
    consumer.close();
}

Running this code on 20 JVMs leads to a memory leak in about 20min. Here's what the heap (blue) and GC pause time (green) on the Kafka server look like: KafkaMemoryLeak

Am I doing something wrong (or is there a better way to approach this), or is this a bug in Kafka when a lot of consumers are created and closed?

I'm running Kafka 0.10.2.1 on the client side and Kafka 0.10.2.0 on the server.

like image 770
Bogdan Avatar asked Nov 08 '22 21:11

Bogdan


1 Answers

Regardless of the number and frequency of requests you receive, you can still reuse KafkaConsumer instances. You can only poll when a request arrives, but you don't need to create and close a consumer each time.

Having said that, your usage of the consumers might have revealed a memory management issue on the broker if memory usage increases and is not reclaimed by GCs. I have seen issues reporting the broker runs out of direct memory when the producers are recycled very frequently. So it's likely there is scope for improvement there. Probably best to raise a ticket at issues.apache.org to have it looked at.

like image 95
Michal Borowiecki Avatar answered Nov 15 '22 10:11

Michal Borowiecki