Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I set the size of messages in Kafka?

I'm currently using Kafka 0.9.0.1. According to some sources I've found, the way to set the sizes of messages is to modify the following key values in server.properties.

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

My server.properties file actually has these settings.

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

Other settings that may be relevant are below.

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

However, when I attempt to send messages with payloads of 4 to 6 MB in size, the consumer never gets any messages. The producer seems to send the messages without any exceptions being thrown. If I do send smaller payloads (like < 1 MB) then the consumer does actually receive the messages.

Any idea on what I'm doing wrong in terms of configuration settings?

Here is the example code to send a message.

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();

Here is the example code to receive a message.

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) {
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  }
}

Here are the methods to populate the properties files for the producer and consumer.

public static Properties getProducerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;
}

public static Properties getConsumerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;
}
like image 802
Jane Wayne Avatar asked Feb 29 '16 03:02

Jane Wayne


2 Answers

Jane, Don't use fetch.message.max.bytes first of all because that's a property that is from the consumer and doesn't go in the server.properties file and second because is for the old version of the consumer, instead use max.partition.fetch.bytes when you create the Consumer as part of the properties that you use to instantiate it.

like image 105
Nautilus Avatar answered Nov 04 '22 07:11

Nautilus


You need to increase in server (as already described) and client side.

Example in Python using kafka-python Producer:

producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)

Increase max_request_size to desired value, default is 1048576.

like image 35
Thiago Falcao Avatar answered Nov 04 '22 08:11

Thiago Falcao