Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nodejs Kafka consumer with pm2 cluster

I've implemented Kafka consumer application, I just wanted to know if I run this app in pm2 cluster mode, will all the cores consume the same message or different message? is there a way I can verify it? and is it ideal to run this app in cluster mode? the reason I'm running this in cluster mode because our kafka produce large number of messages.

Also currently if I run this in pm2 cluster mode all of our cores are reaching it's 100% of CPU usage. Is it suppose to happen like this?

FYI: I'm using https://www.npmjs.com/package/no-kafka

like image 506
FR STAR Avatar asked Jan 31 '26 05:01

FR STAR


2 Answers

will all the cores consume the same message or different message? is there a way I can verify it?

This depends on you topic configuration + consumer configuration. Let's take an example.

  • Let's say we have a topic with 3 partitions.
  • Now we start 1 consumer process with consumer group as "some_consumer_group". For comsumer group details have a look here https://www.npmjs.com/package/no-kafka#groupconsumer-new-unified-consumer-api.
  • Now your one consumer is listening to 3 partitions.
  • Since kafka maintains offsets per topic, per partition for each consumer group your consumer will receive 3 messages, from 3 different partitions. Hence no duplication of messages.
  • Now let's add one more consumer process in the mix.
  • Now consumer 1 for consumer group "some_consumer_group" is listening to partition 0 and 1, while consumer 2 for consumer group "some_consumer_group" is listening to partition 2. (It might go the other way around too).
  • Finally if we add one more consumer to the group, now we have each consumer listening to 1 partition
  • If this is the setup, you will experience no duplicate messages.

Also currently if I run this in pm2 cluster mode all of our cores are reaching it's 100% of CPU usage. Is it suppose to happen like this?

I'm not really familiar with no-kafka and, how the messages are being processed.

But check, whether the library waits for commits to happen before fetching the next batch of messages.

If not there might be a chance your process is creating too many handlers for the messages.

like image 189
rahulmamgain Avatar answered Feb 02 '26 22:02

rahulmamgain


PM2 based clustering is good only for network servers because clustered processes share the incoming network port and distribute the requests.

In your case, the source of data is a message subscription, which will have to be distributed to cluster's worker processes manually.

So, to be on the safe side, the master process should interact with the source of data and evenly distribute messages to the worker processes, so that externally, it appears to be a single consumer but still can process messages on all CPU cores.

The below example demonstrates such a setup without depending on PM2 based clustering:

const cluster = require('cluster');
const _ = require('lodash');
const os = require('os');

// dispatch index
let dispatchIndex = 0;

/**
 * Dispatches data to workers in a cyclic fashion
 * @param {*} data - data to process
 */
function dispatch(data) {

    // ensure master
    if (!cluster.isMaster) {
        throw new Error('Only master can dispatch');
    }

    // get worker ids, sorted
    const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);

    // ensure at least one worker is available
    if (workersIds.length < 1) {
        throw new Error('No worker process alive');
    }

    // select next worker
    dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
    const worker = cluster.workers[workersIds[dispatchIndex]];
    dispatchIndex++;

    // send data to worker
    worker.send(data);
}


// Main Script
if (cluster.isMaster) {

    // Setup master process
    console.info(`Master ${process.pid} started.`);

    // fork worker processes to match available CPUs
    const numCpu = os.cpus().length;
    for (let i = 0; i < numCpu; i++) {
        cluster.fork();
    }

    // *** Get/Subscribe data from external source and dispatch to workers ***
    setInterval(() => dispatch({ a: 'value' }), 1000);

} else if (cluster.isWorker) {

    // Setup worker process
    console.info(`Worker ${process.pid} started.`);

    // *** handle dispatched data ***
    process.on('message', (data) => {
        console.info(`Data processed by ${process.pid}`);
    });
}

It's also good to read up the cluster module documentation.

like image 28
S.D. Avatar answered Feb 02 '26 21:02

S.D.