Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer configuration / performance issues

I'm trying out kafka as an alternative to AWS SQS. The motivation primarily is to improve performance where kafka would eliminate the constraint of pulling 10 messages at a time with a cap of 256kb. Here's a high-level scenario of my use case. I've a bunch of crawlers which are sending documents for indexing. The size of the payload is around 1 mb on average. The crawlers call a SOAP end-point which in turn runs a producer code to submit the messages to a kafka queue. The consumer app picks up the messages and processes them. For my test box, I've configured the topic with 30 partitions with 2 replication. The two kafka instances are running with 1 zookeeper instance. The kafka version is 0.10.0.

For my testing, I published 7 million messages in the queue. I created a consumer group with 30 consumer thread , one per partition. I was initially under the impression that this would substantially speed up the processing power compared to what I was getting via SQS. Unfortunately, that was not to be the case. In my case, the processing of data is complex and takes up 1-2 minutes on average to complete.That lead to a flurry of partition rebalancing as the threads were not able to heartbeat on time. I could see a bunch of messages in the log citing

Auto offset commit failed for group full_group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in the poll() with max.poll.records.

This lead to the same message being processed multiple times. I tried playing around with session timeout, max.poll.records and poll time to avoid this, but that slowed down the overall processing bigtime. Here's some of the configuration parameter.

metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, eliminated duplicate processing but slowed down the overall process significantly. It ended up taking 35 hours to complete processing all 6 million messages compared to 25 hours using the SQS based solution. Each consumer thread on average retrieved 50-60 messages per poll, though some of them polled 0 records at times. I'm not sure about this behavior when there are a huge amount messages available in the partition. The same thread was able to pick up messages during the subsequent iteration. Could this be due to rebalancing ?

Here's my consumer code

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
    }
I understanding that record processing part is one bottleneck in my case. But I'm sure a few folks here have a similar use case of dealing with large processing time. I thought of doing an async processing by spinning each processor in it's dedicated thread or use a thread pool with large capacity, but not sure if it would create a big load in the system. At the same time, I've seen a couple of instances where people have used pause and resume API to perform the processing in order to avoid rebalancing issue.

I'm really looking for some advice / best practice in this circumstance. Particularly, the recommended configuration setting around hearbeat, request timeout, max poll records, auto commit interval, poll interval, etc. if kafka is not the right tool for my use case, please let me know as well.

like image 425
Shamik Avatar asked Oct 18 '22 00:10

Shamik


1 Answers

You can start by processing messages asynchronously, in a separate thread than the thread that reads from Kafka. This way auto committing will be very fast and Kafka will not cut your session. Something like this:

    private final BlockingQueue<TextAnalysisRequest> requests = 
new LinkedBlockingQueue();

In the reading thread:

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    requests.offer(textAnalysisObj);
                }
            }
     }    
}
catch(Exception ex){
    LOGGER.error("Error in Full Consumer group worker", ex);
}

In the processing thread:

            while (!Thread.currentThread().isInterrupted()) {
                try {
                    TextAnalysisRequest textAnalysisObj = requests.take();
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                } catch (InterruptedException e) {
                    LOGGER.info("Process thread interrupted", e);
                    Thread.currentThread().interrupt();
                } catch (Throwable t) {
                    LOGGER.warn("Unexpected throwable while processing.", t);
                }
            }

Take a look also at this documentation, for a strategy to send large messages through Kafka: http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

In short it says that Kafka perform best on small size messages of around 10K, and if you need to send larger messages, it's better to put them on a network storage, and send through Kafka just their location, or split them.

like image 140
dcernahoschi Avatar answered Oct 21 '22 06:10

dcernahoschi