Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner

I'm currently working with the Kafka Connect S3 Sink Connector 3.3.1 to copy Kafka messages over to S3 and I have OutOfMemory errors when processing late data.

I know it looks like a long question, but I tried my best to make it clear and simple to understand. I highly appreciate your help.

High level info

  • The connector does a simple byte to byte copy of the Kafka messages and add the length of the message at the beginning of the byte array (for decompression purposes).
    • This is the role of the CustomByteArrayFormat class (see configs below)
  • The data is partitioned and bucketed according to the Record timestamp
    • The CustomTimeBasedPartitioner extends the io.confluent.connect.storage.partitioner.TimeBasedPartitioner and its sole purpose is to override the generatePartitionedPath method to put the topic at the end of the path.
  • The total heap size of the Kafka Connect process is of 24GB (only one node)
  • The connector process between 8,000 and 10,000 messages per second
  • Each message has a size close to 1 KB
  • The Kafka topic has 32 partitions

Context of OutOfMemory errors

  • Those errors only happen when the connector has been down for several hours and has to catch up data
  • When turning the connector back on, it begins to catch up but fail very quickly with OutOfMemory errors

Possible but incomplete explanation

  • The timestamp.extractor configuration of the connector is set to Record when those OOM errors happen
  • Switching this configuration to Wallclock (i.e. the time of the Kafka Connect process) DO NOT throw OOM errors and all of the late data can be processed, but the late data is no longer correctly bucketed
    • All of the late data will be bucketed in the YYYY/MM/dd/HH/mm/topic-name of the time at which the connector was turn back on
  • So my guess is that while the connector is trying to correctly bucket the data according to the Record timestamp, it does too many parallel reading leading to OOM errors
    • The "partition.duration.ms": "600000" parameter make the connector bucket data in six 10 minutes paths per hour (2018/06/20/12/[00|10|20|30|40|50] for 2018-06-20 at 12pm)
    • Thus, with 24h of late data, the connector would have to output data in 24h * 6 = 144 different S3 paths.
    • Each 10 minutes folder contains 10,000 messages/sec * 600 seconds = 6,000,000 messages for a size of 6 GB
    • If it does indeed read in parallel, that would make 864GB of data going into memory
  • I think that I have to correctly configure a given set of parameters in order to avoid those OOM errors but I don't feel like I see the big picture
    • The "flush.size": "100000" imply that if there is more dans 100,000 messages read, they should be committed to files (and thus free memory)
      • With messages of 1KB, this means committing every 100MB
      • But even if there is 144 parallel readings, that would still only give a total of 14.4 GB, which is less than the 24GB of heap size available
      • Is the "flush.size" the number of record to read per partition before committing? Or maybe per connector's task?
    • The way I understand "rotate.schedule.interval.ms": "600000" config is that data is going to be committed every 10 minutes even when the 100,000 messages of flush.size haven't been reached.

My main question would be what are the maths allowing me to plan for memory usage given:

  • the number or records per second
  • the size of the records
  • the number of Kafka partitions of the topics I read from
  • the number of Connector tasks (if this is relevant)
  • the number of buckets written to per hour (here 6 because of the "partition.duration.ms": "600000" config)
  • the maximum number of hours of late data to process

Configurations

S3 Sink Connector configurations

{
  "name": "xxxxxxx",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"
}

Worker configurations

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX

Edit:

I forgot to add an example of the errors I have:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
like image 661
raphael Avatar asked Jun 21 '18 14:06

raphael


1 Answers

I was finally able to understand how the Heap Size usage works in the Kafka Connect S3 Connector

  • The S3 Connector will write the data of each Kafka partition into partitioned paths
    • The way those paths are partitioned depends on the partitioner.class parameter;
    • By default, it is by timestamp, and the value of partition.duration.ms will then determine the duration of each partitioned paths.
  • The S3 Connector will allocate a buffer of s3.part.size Bytes per Kafka partition (for all topics read) and per partitioned paths
    • Example with 20 partitions read, a timestamp.extractor set to Record, partition.duration.ms set to 1h, s3.part.size set to 50 MB
      • The Heap Size needed each hour is then equal to 20 * 50 MB = 1 GB;
      • But, timestamp.extractor being set to Record, messages having a timestamp corresponding to an earlier hour then the one at which they are read will be buffered in this earlier hour buffer. Therefore, in reality, the connector will need minimum 20 * 50 MB * 2h = 2 GB of memory because there is always late events, and more if there is events with a lateness superior to 1 hour;
      • Note that this isn't true if timestamp.extractor is set to Wallclock because there will virtually never be late events as far as Kafka Connect is concerned.
    • Those buffers are flushed (i.e. leave the memory) at 3 conditions
      • rotate.schedule.interval.ms time has passed
        • This flush condition is always triggered.
      • rotate.interval.ms time has passed in terms of timestamp.extractor time
        • This means that if timestamp.extractor is set to Record, 10 minutes of Record time can pass in less or more and 10 minutes of actual time
          • For instance, when processing late data, 10 minutes worth of data will be processed in a few seconds, and if rotate.interval.ms is set to 10 minutes then this condition will trigger every second (as it should);
          • On the contrary, if there is a pause in the flow of events, this condition will not trigger until it sees an events with a timestamp showing that more than rotate.interval.ms has passed since the condition last triggered.
      • flush.size messages have been read in less than min(rotate.schedule.interval.ms, rotate.interval.ms)
        • As for the rotate.interval.ms, this condition might never trigger if there is not enough messages.
    • Thus, you need to plan for Kafka partitions * s3.part.size Heap Size at the very least
      • If you are using a Record timestamp for partitioning, you should multiply it by max lateness in milliseconds / partition.duration.ms
        • This is a worst case scenario where you have constantly late events in all partitions and for the all range of max lateness in milliseconds.
  • The S3 connector will also buffer consumer.max.partition.fetch.bytes bytes per partition when it reads from Kafka
    • This is set to 2.1 MB by default.
  • Finally, you should not consider that all of the Heap Size is available to buffer Kafka messages because there is also a lot of different objects in it
    • A safe consideration would be to make sure that the buffering of Kafka messages does not go over 50% of the total available Heap Size.
like image 131
raphael Avatar answered Sep 30 '22 21:09

raphael