Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get Kafka compressed message size

I would like to know the compressed size of a message in kafka.

I use kafka 1.1.0 and java kafka-connect 1.1.0 to send messages from my producer to a topic.

If the message is too large for my producer, I get a

The message is xxx bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Setting max.request.size to a fitting value results in an error message from the broker as message.max.bytes has to be adjusted in the broker configuration accordingly as well. The error message does unfortunately not include the size of the message the broker received. I adjusted message.max.bytes. So far so good.

If I activate compression on the producer side, the max.request.size still has to be the same size as without compression as the code unfortunately compares the size of the non-compressed message before compressing it (see https://issues.apache.org/jira/browse/KAFKA-4169)

But with compression I would be able to decrease the message.max.bytes in the broker. The problem is that at no point I can determine the size of this compressed message. Is there any way to figure that out either in the producer code before sending the message or later in the log files?

In my case with compression the default value of 1MB for message.max.bytes is sufficient, so I don't have to alter the default configuration. But I would like to know if my compressed message is way below 1MB or maybe just 0.99MB. In that case I might increase message.max.bytes in production to avoid problems.

Thanks for your support in advance.

like image 587
user1451252 Avatar asked May 09 '18 10:05

user1451252


1 Answers

What you could do is use a compression library, compress the message yourself, check the size before sending it. For example assuming you are using lz4 compression, you can use lz4-java lib and then something like:

private static LZ4Compressor COMPRESS = LZ4Factory.fastestInstance().highCompressor();

String meMessageString      = "My Message that I am sending to kafka";
byte[] uncompressedBytes    = jsonRequest.getBytes();
long lz4compressedLength    = COMPRESSOR.compress(uncompressedBytes).length;
like image 144
CaptainHastings Avatar answered Nov 16 '22 20:11

CaptainHastings