We Are running a 5 node flink cluster (1.6.3) over kubernetes, with a 5 partitions Kafka topic source. 5 jobs are reading from that topic (with different consumer group), each with parallelism = 5.
Each task manager is running with 10Gb of ram and the task manager heap size is limited to 2Gb. The ingestion load is rather small (100-200 msgs per second) and an avg message size is ~4-8kb. all jobs are running fine for a few hours. after a duration we suddenly see one or more jobs failing on:
ava.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:666)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
flink restarts the job, but it keeps failing on that exception. we've tried reducing the record poll as suggested here: Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory We also tried increasing kafka heap size as suggested here: Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1, although i'm failing to understand how failing to allocate memory in the flink process has anything to do with the jvm memory of the kafka broker process, and i don't see anything to indicate oom in the broker logs.
What might be the cause of that failure? what else should we check?
Thanks!
One thing that you may have underestimated, is that having a parallelism of 5, means there are 5+4+3+2+1=18 pairs of combinations. If we compare this to the linked thread, there were likely 3+2+1=6 combinations.
In the linked thread the problem was resolved by setting the max poll records to 250, hence my first thought would be to set it to 80 here (or even to 10) and see if that resolves the problem.
(I am not sure if the requirements are shaped this way, but the only noticable difference is the parallelism from 3 to 5 so that seems like a good point to compensate for).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With