Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

can I limit consumption of kafka-node consumer?

It seems like my kafka node consumer:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

is fetching way too many messages than I can handle in certain cases. Is there a way to limit it (for example accept no more than 1000 messages per second, possibly using the pause api?)

  • I'm using kafka-node, which seems to have a limited api comparing to the Java version
like image 427
Mattan Bitner Avatar asked Jul 23 '16 20:07

Mattan Bitner


2 Answers

In Kafka, poll and process should happen in a coordinated/synchronized way. Ie, after each poll, you should process all received data first, before you do the next poll. This pattern will automatically throttle the number of messages to the max throughput your client can handle.

Something like this (pseudo-code):

while(isRunning) {
  messages = poll(...)
  for(m : messages) {
    process(m);
  }
}

(That is the reason, why there is not parameter "fetch.max.messages" -- you just do not need it.)

like image 191
Matthias J. Sax Avatar answered Oct 13 '22 12:10

Matthias J. Sax


I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.

I used async/queue along with a wrapper of async/cargo called asyncTimedCargo for batching purpose. The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size or timeout batch_config.batch_timeout. async/queue provides saturated and unsaturated callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks, callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    }, batch_config.batch_size, batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,
    errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
}, function (task, callback) {
// your worker task for queue
  callback()
}), queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};
like image 29
Nikunj Jain Avatar answered Oct 13 '22 12:10

Nikunj Jain