Kafka is confusing me. I am running it local with standard values. only auto create topic turned on. 1 partition, 1 node, everything local and simple. If it write
consumer.subscribe("test_topic");
consumer.poll(10);
It simply won't work and never finds any data. If I instead assign a partition like
consumer.assign(new TopicPartition("test_topic",0));
and check the position I sit at 995. and now can poll and receive all the data my producer put in.
What is it that I don't understand about subscriptions? I don't need multiple consumers each handling only a part of the data. My consumer needs to get all the data of a certain topic. Why does the standard subscription approach not work for me that is shown in all the tutorials? I do understand that partitions are for load balancing consumers. I don't understand what I do wrong with the subscription.
consumer config properties
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "postproc-" + EnvUtils.getAppInst()); // jeder ist eine eigene gruppe -> kriegt alles
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<Long, byte[]>(props);
producer config
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 2);
props.put("batch.size", 16384);
props.put("linger.ms", 5000);
props.put("buffer.memory", 1024 * 1024 * 10); // 10mb
props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer(props);
producer execution
try (ByteArrayOutputStream out = new ByteArrayOutputStream()){
event.writeDelimitedTo(out);
for (long a = 10; a<20;a++){
long rand=new Random(a).nextLong();
producer.send(new ProducerRecord<>("test_topic",rand ,out.toByteArray()));
}
producer.flush();
}catch (IOException e){
consumer execution
consumer.subscribe(Arrays.asList("test_topic"));
ConsumerRecords<Long,byte[]> records = consumer.poll(10);
for (ConsumerRecord<Long,byte[]> r :records){ ...
I managed to solve the issue. The problem were timeouts. When piling I didn't give it enough time to complete. I assume assigning a partition just is a lot faster and therfore completed timely. The standard subscription poll takes longer. Never actually finished and did not commit. At least I think that was the problem. With longer timeouts it works.
You are missing this property I think
auto.offset.reset=earliest
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
Reference: http://kafka.apache.org/documentation.html#highlevelconsumerapi
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With