RabbitMQ 3.5 and Message Priority




RabbitMQ 3.5 now supports message priority; However, I am unable to build a working example. I've placed my code below. It includes the output that I expect and the output I actually. I'd be interested in more documentation, and/or a working example.

So my question in short: How do I get message priority to work in Rabbit


using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;

class Publisher

    public static void Main()
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
                IDictionary <String , Object> args = new Dictionary<String,Object>() ;
                args.Add(" x-max-priority ", 10);
                channel.QueueDeclare("task_queue1", true, false, true, args);

                for (int i = 1 ; i<=10; i++ )
                    var message = "Message";
                    var body = Encoding.UTF8.GetBytes(message + " " + i);
                    var properties = channel.CreateBasicProperties();
                    properties.Priority = Convert.ToByte(i);
                    channel.BasicPublish("", "task_queue1", properties, body);


using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Consumer
    class Worker
        public static void Main()
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                    IDictionary<String, Object> args = new Dictionary<String, Object>();                      
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
                    channel.BasicConsume( "task_queue1", false, "", args, consumer);
                    Console.WriteLine(" [*] Waiting for messages. " +
                                      "To exit press CTRL+C");
                    while (true)
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);

Actual output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10

Expected output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1

UPDATE #1. I found an example in Java here. However it's the Rabbit 3.4.x.x. addin that was incorporated into 3.5. The only difference I can see is that they express the priority as an int and mine is a byte. But I feel like that's a red herring. I'm at a bit of a loss here.

2 Answers

Well I solved it. It was a dumb mistake. I wrote:

args.Add(" x-max-priority ", 10);

It should have been

args.Add("x-max-priority", 10);

I'll leave this up so other people can have a working example of Rabbitmq 3.5's Priority Queues in C#.

A similar RabbitMq Priority Queue Implementation in Node JS

Install amqplib

In order to test, we are required to have amqplib installed

npm install amqplib

Publisher (send.js)

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  if (conn) conn.close(function() { process.exit(1); });

function on_connect(err, conn) {
  if (err !== null) return bail(err);

  // name of queue
  var q = 'hello';
  var msg = 'Hello World!';
  var priorityValue = 0;

  function on_channel_open(err, ch) {
    if (err !== null) return bail(err, conn);
    // maxPriority : max priority value supported by queue
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);

      for(var index=1; index<=100; index++) {
             priorityValue = Math.floor((Math.random() * 10));
             msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
             ch.publish('', q, new Buffer(msg), {priority: priorityValue});
             console.log(" [x] Sent '%s'", msg);

      ch.close(function() { conn.close(); });



Subscriber (receive.js)

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  if (conn) conn.close(function() { process.exit(1); });

function on_connect(err, conn) {
  if (err !== null) return bail(err);
  process.once('SIGINT', function() { conn.close(); });

  var q = 'hello';

  function on_channel_open(err, ch) {
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);
      ch.consume(q, function(msg) { // message callback
        console.log(" [x] Received '%s'", msg.content.toString());
      }, {noAck: true}, function(_consumeOk) { // consume callback
        console.log(' [*] Waiting for messages. To exit press CTRL+C');




node send.js

It will create a queue named 'hello' and will flood it with '1000' sample messages using default AMQP exchange.

node receive.js

It will act as a consumer to subscribe to messages waiting in the queue.

