Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer subscription vs. assigned partition

Kafka is confusing me. I am running it local with standard values. only auto create topic turned on. 1 partition, 1 node, everything local and simple. If it write

consumer.subscribe("test_topic");
consumer.poll(10);

It simply won't work and never finds any data. If I instead assign a partition like

consumer.assign(new TopicPartition("test_topic",0));

and check the position I sit at 995. and now can poll and receive all the data my producer put in.

What is it that I don't understand about subscriptions? I don't need multiple consumers each handling only a part of the data. My consumer needs to get all the data of a certain topic. Why does the standard subscription approach not work for me that is shown in all the tutorials? I do understand that partitions are for load balancing consumers. I don't understand what I do wrong with the subscription.

consumer config properties
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "postproc-" + EnvUtils.getAppInst()); // jeder ist eine eigene gruppe -> kriegt alles
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<Long, byte[]>(props);

producer config
 props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 2);
        props.put("batch.size", 16384);
        props.put("linger.ms", 5000);
        props.put("buffer.memory", 1024 * 1024 * 10); // 10mb
        props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer(props);

producer execution
try (ByteArrayOutputStream out = new ByteArrayOutputStream()){
            event.writeDelimitedTo(out);
            for (long a = 10; a<20;a++){
                long rand=new Random(a).nextLong();
                producer.send(new ProducerRecord<>("test_topic",rand ,out.toByteArray()));
            }
            producer.flush();
        }catch (IOException e){

consumer execution

consumer.subscribe(Arrays.asList("test_topic"));
ConsumerRecords<Long,byte[]> records = consumer.poll(10);
for (ConsumerRecord<Long,byte[]> r :records){ ...
like image 555
sam st Avatar asked Mar 14 '23 10:03

sam st


2 Answers

I managed to solve the issue. The problem were timeouts. When piling I didn't give it enough time to complete. I assume assigning a partition just is a lot faster and therfore completed timely. The standard subscription poll takes longer. Never actually finished and did not commit. At least I think that was the problem. With longer timeouts it works.

like image 188
sam st Avatar answered Mar 21 '23 02:03

sam st


You are missing this property I think

auto.offset.reset=earliest

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

Reference: http://kafka.apache.org/documentation.html#highlevelconsumerapi

like image 35
Nautilus Avatar answered Mar 21 '23 02:03

Nautilus