Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Connect Out of Java heap space after enabling SSL

I have recently enabled SSL and tried to start Kafka connect in distributed mode. When running

connect-distributed connect-distributed.properties

I get the following errors:

[2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
[2018-10-09 16:50:55,471] ERROR WorkerSinkTask{id=sink-mariadb-test} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

and

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:694)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I have also tried to increase max and initial heap size by setting the KAFKA_HEAP_OPTS environment variable by running

KAFKA_HEAP_OPTS="-Xms4g -Xmx6g" connect-distributed connect-distributed.properties

but still doesn't work.

My questions are:

  1. Can SSL authentication affect memory usage by any chance?
  2. How can I fix the issue?

EDIT:
I have tried to disable SSL and everything is working without any problems.

like image 916
Giorgos Myrianthous Avatar asked Oct 09 '18 14:10

Giorgos Myrianthous


People also ask

How do I fix out of memory heap space in Java?

OutOfMemoryError: Java heap space. 1) An easy way to solve OutOfMemoryError in java is to increase the maximum heap size by using JVM options "-Xmx512M", this will immediately solve your OutOfMemoryError.

What happens when Java runs out of heap space?

OutOfMemoryError is a runtime error in Java which occurs when the Java Virtual Machine (JVM) is unable to allocate an object due to insufficient space in the Java heap. The Java Garbage Collector (GC) cannot free up the space required for a new object, which causes a java. lang.

How do I fix a heap space error?

The java. lang. OutOfMemoryError: Java heap space error occurs when it attempts to add more data into the heap space area, but there is not enough room for it. The solution to fix this problem is to increase the heap space(Default value maybe 128 MB).

How do I increase Kafka heap size?

For increasing the heap size, set the following environment variable and restart Kafka. Kafka will check for KAFKA_HEAP_OPTS before it starts and if there is no value set for this variable, it assigns 512MB as the value, else it will pick up the configured value.


1 Answers

I ran into this issue when enabling SASL_SSL in Kafka Connect :

[2018-10-12 12:33:36,426] ERROR WorkerSinkTask{id=test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) 
java.lang.OutOfMemoryError: Java heap space

Checking ConsumerConfig values showed me that my configuration was not applied :

[2018-10-12 12:33:35,573] INFO ConsumerConfig values: 
...
security.protocol = PLAINTEXT

I found out that you have to prefix configs with producer. or consumer. in your properties file.

consumer.security.protocol=SASL_SSL
like image 192
Gery Avatar answered Sep 21 '22 16:09

Gery