Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka upgrade to .9 with new consumer api

We are upgrading our kafka implementation to .9 and using new consumer java api to create consumer.I am using below code for consumer and we are using setting topic to consumer as in LINE A and LINE B is the call to our service which process the messages that we receive. Now the problem is we are getting Exception if our message processing takes more then 30 second.

    Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("auto.offset.reset", "earliest");
            props.put("heartbeat.interval.ms", "1000");
            props.put("receive.buffer.bytes", 10485760);
            props.put("fetch.message.max.bytes", 5242880);
            props.put("enable.auto.commit", false);
    //with partition assigned to consumer


            KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
           // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
            //consumer.assign(Arrays.asList(partition0));
            //assign topic to consumer without partition
//LINE A
            consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                try {
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    consumeFromQueue(records);//LINE B
                    consumer.commitSync();

                } catch (CommitFailedException e) {
                    e.printStackTrace();
                    System.out.println("CommitFailedException");
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("Exception in while consuming messages");
                }

Exception is

2016-03-03 10:47:35.095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead. 2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group TEST-GROUP CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)

Above exception comes while committing offset. Any suggestions would help thank you

like image 974
Sunny Gupta Avatar asked Mar 09 '16 15:03

Sunny Gupta


2 Answers

This happens because the new consumer is single-threaded, and the only way that it can keep the heartbeat with the consumer group is by polling or committing the offset, after 30 seconds the group coordinator is marking your consumer as dead and calling for a group rebalance. For this situation you can either increase the request.timeout.ms or split the work of consuming and processing between 2 threads.

like image 166
Nautilus Avatar answered Nov 15 '22 03:11

Nautilus


You could limit the number of messages returned by poll() by setting

max.partition.fetch.bytes

to a some suitable threshold that is larger than your largest message, but so low that you will get less messages per poll.

Kafka 0.10.x has support to explicitly limit the number of messages returned to the client by setting

max.poll.records
like image 27
nilsmagnus Avatar answered Nov 15 '22 03:11

nilsmagnus