I'm currently using Kafka 0.9.0.1. According to some sources I've found, the way to set the sizes of messages is to modify the following key values in server.properties
.
My server.properties
file actually has these settings.
message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760
Other settings that may be relevant are below.
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
However, when I attempt to send messages with payloads of 4 to 6 MB in size, the consumer never gets any messages. The producer seems to send the messages without any exceptions being thrown. If I do send smaller payloads (like < 1 MB) then the consumer does actually receive the messages.
Any idea on what I'm doing wrong in terms of configuration settings?
Here is the example code to send a message.
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
File f = new File(dir, s);
byte[] data = Files.readAllBytes(f.toPath());
Payload payload = new Payload(data); //a simple pojo to store payload
String key = String.valueOf(System.currentTimeMillis());
byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();
Here is the example code to receive a message.
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecord<String, byte[]> records = consumer.poll(100);
for(ConsumerRecord<String, byte[]> record : records) {
long offset = record.offset();
String key = record.key();
byte[] val = record.value();
Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
System.out.println(
System.format("offset=%d, key=%s", offset, key));
}
}
Here are the methods to populate the properties files for the producer and consumer.
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compression.type", "snappy");
props.put("max.request.size", 10485760); //need this
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.partition.fetch.bytes", 10485760); //need this too
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
Jane,
Don't use fetch.message.max.bytes
first of all because that's a property that is from the consumer and doesn't go in the server.properties file and second because is for the old version of the consumer, instead
use max.partition.fetch.bytes
when you create the Consumer as part of the properties that you use to instantiate it.
You need to increase in server (as already described) and client side.
Example in Python using kafka-python Producer:
producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)
Increase max_request_size to desired value, default is 1048576.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With