I am using single node Kafka broker (0.10.2) and single node zookeeper broker (3.4.9). I am having a consumer server (single core and 1.5 GB RAM). Whenever I am running a process with 5 or more threads my consumer's threads are getting killed after throwing these exceptions
java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)
I googled it and used below-mentioned JVM parameters but still the same exceptions occurred
-XX:MaxDirectMemorySize=768m
-Xms512m
How to fix this issue?Is any other javm parameter tuning required?
My Kafka consumer Code is
import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern
class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
Boolean isRebalancingTriggered = false
private final Long REBALANCING_SLEEP_TIME = 1000
public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
logger.debug("Populating Property for kafak consumer")
logger.debug("BatchSize {}",batchSize)
Properties kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("group.id", groupName)
kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
switch(serverType){
case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
break
case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
kafkaConsumerProperties.put("max.poll.records",1)
kafkaConsumerProperties.put("max.poll.interval.ms",600000)
kafkaConsumerProperties.put("request.timeout.ms",600005)
break
default :
throw "Invalid server type"
break
}
logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
topicNameList = topicNameRegex.split(Pattern.quote('|'))
logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
logger.debug("{} [Constructor] Exit",TAG)
}
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
isRebalancingTriggered = false
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
isRebalancingTriggered = true
publishAllKafkaTopicBatchMessages()
commitOffset()
}
}
private class AsyncCommitCallBack implements OffsetCommitCallback{
@Override
void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
}
}
@Override
void run() {
logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
populateKafkaConfigMap()
initializeKafkaTopicMessageListMap()
String topicName
String consumerClassName
String consumerMethodName
Boolean isBatchJob
Integer batchSize = 0
final Thread mainThread = Thread.currentThread()
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
kafkaConsumer.wakeup()
try {
mainThread.join()
} catch (InterruptedException exception) {
logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
}
}
})
kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
try{
while(true){
logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
ConsumerRecords kafkaRecords
if(isRebalancingTriggered == false) {
kafkaRecords = kafkaConsumer.poll(100)
}
else{
logger.error("{} in rebalancing going to sleep",TAG)
Thread.sleep(REBALANCING_SLEEP_TIME)
continue
}
for(ConsumerRecord record: kafkaRecords){
if(isRebalancingTriggered == true){
break
}
currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
topicName = record.topic()
DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
logger.debug("Details about Message")
logger.debug("Thread {}",mainThread.getName())
logger.debug("Topic {}",topicName)
logger.debug("Partition {}",record.partition().toString())
logger.debug("Offset {}",record.offset().toString())
logger.debug("clasName {}",consumerClassName)
logger.debug("methodName {}",consumerMethodName)
logger.debug("isBatchJob {}",isBatchJob.toString())
Object message = record.value()
logger.debug("message {}",message.toString())
if(isBatchJob == true){
prepareMessagesBatch(topicName,message)
//batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
//logger.debug("batchSize {}",batchSize.toString())
}
else{
publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
}
//publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
//try {
// kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
logger.debug("{} Commiting Messages to Kafka",TAG)
//}
/*catch(Exception exception){
kafkaConsumer.commitSync(currentOffsetsMap)
currentOffsetsMap.clear()
logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
}*/
}
commitOffset()
publishAllKafkaTopicBatchMessages()
}
}
catch(InterruptException exception){
logger.error("{} In InterruptException",TAG)
logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
}
catch (WakeupException exception) {
logger.error("{} In WakeUp Exception",TAG)
logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
}
catch(Exception exception){
exception.getMessage()
logger.error("{} In Exception",TAG)
logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
}
finally {
logger.error("{} In finally commiting remaining offset ",TAG)
publishAllKafkaTopicBatchMessages()
//kafkaConsumer.commitSync(currentOffsetsMap)
kafkaConsumer.close()
logger.error("{} Exiting Consumer",TAG)
}
}
private void commitOffset(){
logger.debug("{} [commitOffset] Enter")
logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
if(currentOffsetsMap.size() > 0) {
kafkaConsumer.commitSync(currentOffsetsMap)
currentOffsetsMap.clear()
}
logger.debug("{} [commitOffset] Exit")
}
private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
logger.debug("{} [publishMessageToConsumer] Enter",TAG)
if(isBatchJob == true){
publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
}
else{
publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
}
logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}
private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
executeConsumerMethod(consumerClassName,consumerMethodName,message)
logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}
private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
consumerMessageList.add(message)
if(consumerMessageList.size() == batchSize){
logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
consumerMessageList.clear()
}
kafkaTopicMessageListMap.put(topicName,consumerMessageList)
logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}
private void populateKafkaConfigMap(){
logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
topicNameList.each { topicName ->
DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
}
logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}
private void initializeKafkaTopicMessageListMap(){
logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
topicNameList.each { topicName ->
kafkaTopicMessageListMap.put(topicName,[])
}
logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}
private void executeConsumerMethod(String className, String methodName, def messages){
try{
logger.debug("{} [executeConsumerMethod] Enter",TAG)
logger.debug("{} [executeConsumerMethod] className {} methodName {} messages {}",TAG,className,methodName,messages.toString())
Class.forName(className)."$methodName"(messages)
} catch (Exception exception){
logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
className, messages.toString(), exception.getStackTrace().join("\n"))
}
logger.debug("{} [executeConsumerMethod] Exit",TAG)
}
private void publishAllKafkaTopicBatchMessages(){
logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
String consumerClassName = null
String consumerMethodName = null
kafkaTopicMessageListMap.each { topicName, messageList ->
if (messageList != null && messageList.size() > 0) {
DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
if (messageList != null && messageList.size() > 0) {
executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
messageList.clear()
kafkaTopicMessageListMap.put(topicName, messageList)
}
}
}
logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}
private void prepareMessagesBatch(String topicName,Object message){
logger.debug("{} [prepareMessagesBatch] Enter",TAG)
logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
consumerMessageList.add(message)
kafkaTopicMessageListMap.put(topicName,consumerMessageList)
}
}
Kafka Consumers handles the data backlog by the following two parameters,
max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
Default value is 300000.
max.poll.records
The maximum number of records returned in a single call to poll().
Default value is 500.
Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records and max.poll.interval.ms parameters.
In your code, the case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() is missing these two parameters which could possibly be the cause of the OutOfMemory problem during polling.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With