Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory

I am using single node Kafka broker (0.10.2) and single node zookeeper broker (3.4.9). I am having a consumer server (single core and 1.5 GB RAM). Whenever I am running a process with 5 or more threads my consumer's threads are getting killed after throwing these exceptions

  1. Exception 1

java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

  1. Exception 2

Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

I googled it and used below-mentioned JVM parameters but still the same exceptions occurred

-XX:MaxDirectMemorySize=768m

-Xms512m

How to fix this issue?Is any other javm parameter tuning required?

My Kafka consumer Code is

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
Boolean isRebalancingTriggered = false
private final Long REBALANCING_SLEEP_TIME = 1000

public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
    logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
    logger.debug("Populating Property for kafak consumer")
    logger.debug("BatchSize {}",batchSize)
    Properties kafkaConsumerProperties = new Properties()
    kafkaConsumerProperties.put("group.id", groupName)
    kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
    switch(serverType){
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
            break
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
            kafkaConsumerProperties.put("max.poll.records",1)
            kafkaConsumerProperties.put("max.poll.interval.ms",600000)
            kafkaConsumerProperties.put("request.timeout.ms",600005)
            break
        default :
            throw "Invalid server type"
            break
    }
    logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
    kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
    topicNameList = topicNameRegex.split(Pattern.quote('|'))
    logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
    logger.debug("{} [Constructor] Exit",TAG)
}

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
        isRebalancingTriggered = false
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
        isRebalancingTriggered = true
        publishAllKafkaTopicBatchMessages()
        commitOffset()

    }
}

private class AsyncCommitCallBack implements OffsetCommitCallback{

    @Override
    void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

    }
}

@Override
void run() {
    logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
    populateKafkaConfigMap()
    initializeKafkaTopicMessageListMap()
    String topicName
    String consumerClassName
    String consumerMethodName
    Boolean isBatchJob
    Integer batchSize = 0
    final Thread mainThread = Thread.currentThread()
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
            kafkaConsumer.wakeup()
            try {
                mainThread.join()
            } catch (InterruptedException exception) {
                logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
            }
        }
    })
    kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
    try{
        while(true){
            logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
            ConsumerRecords kafkaRecords
            if(isRebalancingTriggered == false) {
                kafkaRecords = kafkaConsumer.poll(100)
            }
            else{
                logger.error("{} in rebalancing going to sleep",TAG)
                Thread.sleep(REBALANCING_SLEEP_TIME)
                continue
            }
            for(ConsumerRecord record: kafkaRecords){
                if(isRebalancingTriggered == true){
                    break
                }
                currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                topicName = record.topic()
                DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                logger.debug("Details about Message")
                logger.debug("Thread {}",mainThread.getName())
                logger.debug("Topic {}",topicName)
                logger.debug("Partition {}",record.partition().toString())
                logger.debug("Offset {}",record.offset().toString())
                logger.debug("clasName {}",consumerClassName)
                logger.debug("methodName {}",consumerMethodName)
                logger.debug("isBatchJob {}",isBatchJob.toString())
                Object message = record.value()
                logger.debug("message {}",message.toString())
                if(isBatchJob == true){
                    prepareMessagesBatch(topicName,message)
                    //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                    //logger.debug("batchSize {}",batchSize.toString())
                }
                else{
                    publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
                }
                //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                //try {
                //  kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
                logger.debug("{} Commiting Messages to Kafka",TAG)
                //}
                /*catch(Exception exception){
                    kafkaConsumer.commitSync(currentOffsetsMap)
                    currentOffsetsMap.clear()
                    logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
                }*/
            }
            commitOffset()
            publishAllKafkaTopicBatchMessages()
        }
    }
    catch(InterruptException exception){
        logger.error("{} In InterruptException",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch (WakeupException exception) {
        logger.error("{} In WakeUp Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch(Exception exception){
        exception.getMessage()
        logger.error("{} In Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    finally {
        logger.error("{} In finally commiting remaining offset ",TAG)
        publishAllKafkaTopicBatchMessages()
        //kafkaConsumer.commitSync(currentOffsetsMap)
        kafkaConsumer.close()
        logger.error("{} Exiting Consumer",TAG)
    }
}

private void commitOffset(){
    logger.debug("{} [commitOffset] Enter")
    logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
    if(currentOffsetsMap.size() > 0) {
        kafkaConsumer.commitSync(currentOffsetsMap)
        currentOffsetsMap.clear()
    }
    logger.debug("{} [commitOffset] Exit")

}

private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName, messageList ->
        if (messageList != null && messageList.size() > 0) {
            DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
            consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
            consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
            logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
            if (messageList != null && messageList.size() > 0) {
                executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
                messageList.clear()
                kafkaTopicMessageListMap.put(topicName, messageList)

            }
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

private void prepareMessagesBatch(String topicName,Object message){
    logger.debug("{} [prepareMessagesBatch] Enter",TAG)
    logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
    logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)

}

}

like image 600
Abhimanyu Avatar asked Mar 09 '23 13:03

Abhimanyu


1 Answers

Kafka Consumers handles the data backlog by the following two parameters,

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
Default value is 300000.

max.poll.records
The maximum number of records returned in a single call to poll().
Default value is 500.

Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records and max.poll.interval.ms parameters.

In your code, the case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() is missing these two parameters which could possibly be the cause of the OutOfMemory problem during polling.

like image 79
Daniccan Avatar answered Apr 06 '23 17:04

Daniccan