I've implemented Kafka consumer application, I just wanted to know if I run this app in pm2 cluster mode, will all the cores consume the same message or different message? is there a way I can verify it? and is it ideal to run this app in cluster mode? the reason I'm running this in cluster mode because our kafka produce large number of messages.
Also currently if I run this in pm2 cluster mode all of our cores are reaching it's 100% of CPU usage. Is it suppose to happen like this?
FYI: I'm using https://www.npmjs.com/package/no-kafka
will all the cores consume the same message or different message? is there a way I can verify it?
This depends on you topic configuration + consumer configuration. Let's take an example.
Also currently if I run this in pm2 cluster mode all of our cores are reaching it's 100% of CPU usage. Is it suppose to happen like this?
I'm not really familiar with no-kafka and, how the messages are being processed.
But check, whether the library waits for commits to happen before fetching the next batch of messages.
If not there might be a chance your process is creating too many handlers for the messages.
PM2 based clustering is good only for network servers because clustered processes share the incoming network port and distribute the requests.
In your case, the source of data is a message subscription, which will have to be distributed to cluster's worker processes manually.
So, to be on the safe side, the master process should interact with the source of data and evenly distribute messages to the worker processes, so that externally, it appears to be a single consumer but still can process messages on all CPU cores.
The below example demonstrates such a setup without depending on PM2 based clustering:
const cluster = require('cluster');
const _ = require('lodash');
const os = require('os');
// dispatch index
let dispatchIndex = 0;
/**
* Dispatches data to workers in a cyclic fashion
* @param {*} data - data to process
*/
function dispatch(data) {
// ensure master
if (!cluster.isMaster) {
throw new Error('Only master can dispatch');
}
// get worker ids, sorted
const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);
// ensure at least one worker is available
if (workersIds.length < 1) {
throw new Error('No worker process alive');
}
// select next worker
dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
const worker = cluster.workers[workersIds[dispatchIndex]];
dispatchIndex++;
// send data to worker
worker.send(data);
}
// Main Script
if (cluster.isMaster) {
// Setup master process
console.info(`Master ${process.pid} started.`);
// fork worker processes to match available CPUs
const numCpu = os.cpus().length;
for (let i = 0; i < numCpu; i++) {
cluster.fork();
}
// *** Get/Subscribe data from external source and dispatch to workers ***
setInterval(() => dispatch({ a: 'value' }), 1000);
} else if (cluster.isWorker) {
// Setup worker process
console.info(`Worker ${process.pid} started.`);
// *** handle dispatched data ***
process.on('message', (data) => {
console.info(`Data processed by ${process.pid}`);
});
}
It's also good to read up the cluster module documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With