I have been looking everywhere for a headers exchange
example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:
publisher method (create a publisher)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
publish method
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
consumer method (create a consumer)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
The above example works fine with topics
, but I'm not sure how to make the transition to headers
. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to accomplish this.
Any help would be greatly appreciated!
I stumbled across this question looking for the same answers for amqplib. Unfortunately, like you I found all available documentation lacking. After looking over the source, reading over the protocol a bit, and trying out a few combinations, this finally did it for me.
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
The full working code is below. The auth info below is faked, so you'll have to use your own. I'm also using ES6, nodejs version 6.5, and amqplib. There may be an issue with giving your headers x-
prefixes and/or using reserved words as header names, but I'm not too sure (I'd have to see the RabbitMQ source).
emit.js:
#!/usr/bin/env node
const XCHANGE = 'headers-exchange';
const Q = require('q');
const Broker = require('amqplib');
let scope = 'anonymous';
process.on('uncaughtException', (exception) => {
console.error(`"::ERROR:: Uncaught exception ${exception}`);
});
process.argv.slice(2).forEach((arg) =>
{
scope = arg;
console.info('[*] Scope now set to ' + scope);
});
Q.spawn(function*()
{
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
for(let count=0;; count=++count%3)
{
let output = (new Date()).toString();
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);
yield Q.delay(500);
}
});
receive.js:
#!/usr/bin/env node
const Q = require('q');
const Broker = require('amqplib');
const uuid = require('node-uuid');
const Rx = require('rx');
Rx.Node = require('rx-node');
const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);
Q.spawn(function*() {
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
let q = yield chan.assertQueue('', { exclusive: true });
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
console.info('[*] Binding with ' + JSON.stringify(opts));
console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);
chan.consume(q.queue, (msg) =>
{
console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
chan.ack(msg);
});
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With