Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nodejs sqs queue processor

I am trying to write a nodejs sqs queue processor.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) {
      if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              throw error;
            }
            console.log('stdout: ' + stdout);
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              });
            }
          });
      }
    }
  });
}
readMessage();

The above code works fine for single message in queue. How should I write this script so that it keeps polling for messages in queue untill all messages are processed? Should i use set timeout?

like image 854
Yalamber Avatar asked Jul 18 '13 06:07

Yalamber


2 Answers

First of all you should definetely use long polling technique provided by Amazon, and as I understand you are already using it because you have "WaitTimeSeconds": 20 argument in sqs.receiveMessage call. I hope that you didn't forget to configure it in the AWS Web interface.

About polling for messages - you may use different techniques including timers, but I think the most simple will be just call your readMessage() function at the end of receiveMessage's (or even exec's) callback function. So processing of (or waiting for) the next message in queue will start immediately after the end of processing of previous message in queue.

UPDATE:

As for me in your new version of code there are to many readMessage() calls. I think it is better to minimize it to keep code more clear and easy to maintain. But if you leave, for example, the only one call at the end of your main receiveMessage callback you will recieve a lot of PHP worker scripts running in parallel - and maybe it is not so bad from the point of view of performance - but you will have to add some complicated script to control the amount of parallel workers. I think you can cut some calls in exec callback, try to join ifs and join calls in main callback.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) 
      && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              // error handling 
            }
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              }, function(err, data){                
              });
            }
            readMessage();                
          });
      }          
    }        
    readMessage();        
  });
}
readMessage();

About memory leaks: I think that you should not worry because the next call of readMessage() happens in callback function - so not recursively, and recursively called function returns value to parent function just after calling receiveMessage() function.

like image 187
zavg Avatar answered Nov 14 '22 20:11

zavg


If you are using node, use https://www.npmjs.com/package/sqs-worker module. It will do the job for you.

var SQSWorker = require('sqs-worker')

var options =
 { url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue'
}

var queue = new SQSWorker(options, worker)

function worker(notifi, done) {
  var message;
  try {
    message = JSON.parse(notifi.Data)
  } catch (err) {
    throw err
  }

   // Do something with `message` 

   var success = true

   // Call `done` when you are done processing a message. 
   // If everything went successfully and you don't want to see it any more, 
   // set the second parameter to `true`. 
   done(null, success)
}
like image 24
Madura Pradeep Avatar answered Nov 14 '22 20:11

Madura Pradeep