I'm currently working with the Kafka Connect S3 Sink Connector 3.3.1 to copy Kafka messages over to S3 and I have OutOfMemory errors when processing late data.
I know it looks like a long question, but I tried my best to make it clear and simple to understand. I highly appreciate your help.
CustomByteArrayFormat
class (see configs below)Record
timestamp
CustomTimeBasedPartitioner
extends the io.confluent.connect.storage.partitioner.TimeBasedPartitioner
and its sole purpose is to override the generatePartitionedPath
method to put the topic at the end of the path.timestamp.extractor
configuration of the connector is set to Record
when those OOM errors happenWallclock
(i.e. the time of the Kafka Connect process) DO NOT throw OOM errors and all of the late data can be processed, but the late data is no longer correctly bucketed
YYYY/MM/dd/HH/mm/topic-name
of the time at which the connector was turn back onRecord
timestamp, it does too many parallel reading leading to OOM errors
"partition.duration.ms": "600000"
parameter make the connector bucket data in six 10 minutes paths per hour (2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm)24h * 6 = 144
different S3 paths."flush.size": "100000"
imply that if there is more dans 100,000 messages read, they should be committed to files (and thus free memory)
"flush.size"
the number of record to read per partition before committing? Or maybe per connector's task?"rotate.schedule.interval.ms": "600000"
config is that data is going to be committed every 10 minutes even when the 100,000 messages of flush.size
haven't been reached.My main question would be what are the maths allowing me to plan for memory usage given:
"partition.duration.ms": "600000"
config)S3 Sink Connector configurations
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
Worker configurations
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
Edit:
I forgot to add an example of the errors I have:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I was finally able to understand how the Heap Size usage works in the Kafka Connect S3 Connector
paths
paths
are partitioned depends on the partitioner.class
parameter;partition.duration.ms
will then determine the duration of each partitioned paths
.s3.part.size
Bytes per Kafka partition (for all topics read) and per partitioned paths
timestamp.extractor
set to Record
, partition.duration.ms
set to 1h, s3.part.size
set to 50 MB
20 * 50 MB
= 1 GB;timestamp.extractor
being set to Record
, messages having a timestamp corresponding to an earlier hour then the one at which they are read will be buffered in this earlier hour buffer. Therefore, in reality, the connector will need minimum 20 * 50 MB * 2h
= 2 GB of memory because there is always late events, and more if there is events with a lateness superior to 1 hour;timestamp.extractor
is set to Wallclock
because there will virtually never be late events as far as Kafka Connect is concerned.rotate.schedule.interval.ms
time has passed
rotate.interval.ms
time has passed in terms of timestamp.extractor
time
timestamp.extractor
is set to Record
, 10 minutes of Record
time can pass in less or more and 10 minutes of actual time
rotate.interval.ms
is set to 10 minutes then this condition will trigger every second (as it should);rotate.interval.ms
has passed since the condition last triggered.flush.size
messages have been read in less than min(rotate.schedule.interval.ms
, rotate.interval.ms)
rotate.interval.ms
, this condition might never trigger if there is not enough messages.Kafka partitions * s3.part.size
Heap Size at the very least
Record
timestamp for partitioning, you should multiply it by max lateness in milliseconds / partition.duration.ms
max lateness in milliseconds
.consumer.max.partition.fetch.bytes
bytes per partition when it reads from Kafka
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With