I have the following queue consumer class which runs recursively through promises:
"use strict";
var queue = require("./queue"),
helpers = require("./helpers"),
vendors = require("../config/vendors"),
queueConf = require("../config/queue");
function Consumer() {
this.queue = new queue.TaskQueue();
this.currentItem = null;
this.port = null;
this.payload = null;
}
Consumer.prototype.payloadSuccessCb = function (data) {
this.payload = data;
this.run();
};
Consumer.prototype.failureCb = function (data) {
console.error(data);
throw new Error(data);
//TODO: Continue queue processing despite the error
};
Consumer.prototype.processItem = function (data) {
this.currentItem = data;
process.send("Proccess " + process.pid + " is processing item " + this.currentItem);
helpers.getPayload(this.currentItem).then(this.payloadSuccessCb, this.failureCb);
};
Consumer.prototype.wait = function () {
var self = this;
process.send("Proccess " + process.pid + " is waiting for new items");
setTimeout(function () {
self.run();
}, queueConf.waitTime);
};
Consumer.prototype.queueSuccessFb = function (data) {
console.error("here");
if (data) {
this.processItem(data);
} else {
this.wait();
}
};
Consumer.prototype.run = function () {
//this.port = helpers.getVendorPortById(this.currentItem);
this.queue.pop().then(this.queueSuccessFb, this.failureCb);
};
exports.Consumer = Consumer;
I have defined a test which will basically assert that the correct workflow is happening, and that the consumer ultimately handles all of the tasks in the queue (this is an integration test working in front of a real Redis broker)
Test:
"use strict";
var consumer = require("./../../src/consumer"),
queue = require("./../../src/queue"),
Q = require("Q"),
sinon = require("sinon"),
assert = require("assert"),
queueConf = require("./../../config/queue"),
NUM_OF_ITEMS = 5,
queueInstance,
spy,
consumerInstance;
describe("consumer", function () {
beforeEach(function () {
queueInstance = new queue.TaskQueue();
});
describe("waiting for tasks while the queue is empty", function () {
describe("queue success call back", function () {
before(function () {
consumerInstance = new consumer.Consumer();
spy = sinon.spy(consumerInstance, "queueSuccessFb");
});
it("should call the success callback once per the defined period", function (done) {
consumerInstance.run();
setTimeout(function () {
sinon.assert.calledOnce(spy);
done();
}, queueConf.waitTime);
});
it("should call the success callback twice per the defined period + 1", function (done) {
consumerInstance.run();
setTimeout(function () {
sinon.assert.calledTwice(spy);
done();
}, queueConf.waitTime * 2);
});
});
describe("wait function", function () {
before(function () {
consumerInstance = new consumer.Consumer();
spy = sinon.spy(consumerInstance, "wait");
});
});
});
describe("task handling", function () {
beforeEach(function (done) {
this.timeout(6000);
var i, promises = [];
queueInstance = new queue.TaskQueue();
for (i = 1; i <= NUM_OF_ITEMS; i += 1) {
promises.push(queueInstance.push(i));
}
Q.all(promises).then(function () {
done();
});
});
afterEach(function () {
queueInstance.empty();
});
describe("sucess callback", function () {
before(function () {
consumerInstance = new consumer.Consumer();
spy = sinon.spy(consumerInstance, "queueSuccessFb");
});
it("should run all of the available tasks one by one", function (done) {
this.timeout(6000);
consumerInstance.run();
setTimeout(function () {
console.info(spy.callCount);
assert(spy.callCount === NUM_OF_ITEMS);
done();
}, 2000);
});
});
});
});
My problem is that the call count always equals 1.
I thought at first that a andCallThrough()
method invocation is required, similar to the way this works in Jasmine, but then found out that the actual function is being invoked.
Tried using sinon.useFakeTimers()
but that did not work at all (test did not seem to wait, timeout in the consumer class was not firing);
Expected behavior: callCount
is as NUM_OF_ITEMS
(via recursive calls).
Actual behavior: callCount
is always 1.
Hi it's a little confusing to understand what your queue class is doing. Is it a singleton?
If it's not a singleton your consumer is initialised with a fresh empty queue on construction.
function Consumer() {
this.queue = new queue.TaskQueue();
...
}
...
describe("success callback", function () {
before(function () {
consumerInstance = new consumer.Consumer();
spy = sinon.spy(consumerInstance, "queueSuccessFb");
});
....
This won't be the same as the queue you created in
describe("task handling", function () {
beforeEach(function (done) {
...
queueInstance = new queue.TaskQueue();
...
});
...
And as the queues are not the same spy.callCount !== NUM_OF_ITEMS
Unless of course it's a singleton ie:
new queue.TaskQueue() === new queue.TaskQueue();
My suggestion is to enable the TaskQueue to be supplied to the Consumer constructor so you know that the consumer is operating over the expected queue
function Consumer(queue) {
this.queue = queue;
...
}
describe("task handling", function () {
beforeEach(function (done) {
...
this.queueInstance = new queue.TaskQueue();
...
});
...
describe("success callback", function () {
before(function () {
consumerInstance = new consumer.Consumer(this.queueInstance);
spy = sinon.spy(consumerInstance, "queueSuccessFb");
});
....
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With