Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Headers exchange example using RabbitMQ in Node.js

I have been looking everywhere for a headers exchange example using RabbitMQ in Node.js. If someone could point me in the right direction, that would be great. Here's what I have so far:

publisher method (create a publisher)

RabbitMQ.prototype.publisher = function(exchange, type) {
 console.log('New publisher, exchange: '+exchange+', type: '+type);
 amqp.then(function(conn) {
    conn.createConfirmChannel().then(function(ch) {
        publishers[exchange] = {};
        publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
        publishers[exchange].ch = ch;
    });
 },function(err){
    console.error("[AMQP]", err.message);
    return setTimeout(function(){
        self.connect(URI);
    }, 1000);
 }).then(null, console.log);
};

publish method

RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
 try {    
    publishers[exchange].assert.then(function(){
        publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
            if (err) {
                console.error("[AMQP] publish", err);
                offlinePubQueue.push([exchange, routingKey, content]);
                publishers[exchange].ch.connection.close();
            }
        });
    });
 } catch (e) {                                                                                                                               
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
 }
};

consumer method (create a consumer)

RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
 amqp.then(function(conn) {
  conn.createChannel().then(function(ch) {

    var ok = ch.assertExchange(exchange, type, {durable: true});

    ok.then(function() {
      ch.assertQueue('', {exclusive: true});
    });

    ok = ok.then(function(qok) {
      var queue = qok.queue;
      ch.bindQueue(queue,exchange,routingKey)
    });

    ok = ok.then(function(queue) {
      ch.consume(queue, function(msg){
            cb(msg,ch);
      }, {noAck: false});
    });

    ok.then(function() {
      console.log(' [*] Waiting for logs. To exit press CTRL+C.');
    });

  });
 }).then(null, console.warn);
};

The above example works fine with topics, but I'm not sure how to make the transition to headers. I am pretty sure I need to change my binding approach, but haven't been able to find any examples on how exactly to accomplish this.

Any help would be greatly appreciated!

like image 257
user1828780 Avatar asked Dec 10 '15 18:12

user1828780


1 Answers

I stumbled across this question looking for the same answers for amqplib. Unfortunately, like you I found all available documentation lacking. After looking over the source, reading over the protocol a bit, and trying out a few combinations, this finally did it for me.

...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...

...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...

The full working code is below. The auth info below is faked, so you'll have to use your own. I'm also using ES6, nodejs version 6.5, and amqplib. There may be an issue with giving your headers x- prefixes and/or using reserved words as header names, but I'm not too sure (I'd have to see the RabbitMQ source).

emit.js:

#!/usr/bin/env node

const XCHANGE = 'headers-exchange';

const Q      = require('q');
const Broker = require('amqplib');

let scope = 'anonymous';

process.on('uncaughtException', (exception) => {
    console.error(`"::ERROR:: Uncaught exception ${exception}`);
});

process.argv.slice(2).forEach((arg) =>
{
    scope = arg;
    console.info('[*] Scope now set to ' + scope);
});

Q.spawn(function*()
{
    let conn = yield Broker.connect('amqp://root:root@localhost');
    let chan = yield conn.createChannel();

    chan.assertExchange(XCHANGE, 'headers', { durable: false });

    for(let count=0;; count=++count%3)
    {
        let output = (new Date()).toString();
        let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
        chan.publish(XCHANGE, '', Buffer.from(output), opts);
        console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);

        yield Q.delay(500);
    }
});

receive.js:

#!/usr/bin/env node

const Q      = require('q');
const Broker = require('amqplib');
const uuid   = require('node-uuid');
const Rx     = require('rx');

Rx.Node = require('rx-node');

const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);

Q.spawn(function*() {
    let conn = yield Broker.connect('amqp://root:root@localhost');
    let chan = yield conn.createChannel();

    chan.assertExchange(XCHANGE, 'headers', { durable: false });

    let q = yield chan.assertQueue('', { exclusive: true });
    let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };

    chan.bindQueue(q.queue, XCHANGE, '', opts);
    console.info('[*] Binding with ' + JSON.stringify(opts));

    console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);

    chan.consume(q.queue, (msg) =>
    {
        console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
        chan.ack(msg);
    });
});
like image 95
Xunnamius Avatar answered Sep 28 '22 19:09

Xunnamius