Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fetch messages from an Azure Service Bus Queue in "PeekLock" mode using AMQP?

We're trying to consume Azure Service Bus in a Node application. Our requirement is to fetch multiple messages from a queue.

Since Azure SDK for Node doesn't support batch retrieval, we decided to use AMQP. While we're able to fetch messages using Peek Messages as described here (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations).

What we are noticing is that as soon as messages are fetched, they are getting removed from the queue. I am wondering if anyone has an insight into how we can fetch messages in "PeekLock" mode using AMQP and Node. For AMQP, we're using amqp10 node package (https://www.npmjs.com/package/amqp10).

Here's our code for peeking at the messages:

const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
    return Promise.all([
        client.createReceiver(queueName),
        client.createSender(queueName)
    ]);
})
.spread(function(receiver, sender) {
    console.log(receiver);
    console.log(sender);
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', function(err) {
        // check for errors
        console.log(err);
    });
    receiver.on('message', function(message) {
        console.log('Received message');
        console.log(message);
        console.log('------------------------------------');
    });

    return sender.send([], {
        operation: 'com.microsoft:peek-message',
        'message-count': 5
    });
})
.error(function (e) {
    console.warn('connection error: ', e);
});
like image 373
Jagrati Modi Avatar asked Apr 12 '17 05:04

Jagrati Modi


People also ask

Does Azure Service Bus use AMQP?

The Azure Service Bus cloud service uses the AMQP 1.0 as its primary means of communication.

How do I read messages from Service Bus queue?

Open Queue. Select Service Bus Explorer (preview) under Settings. Write policy name and select the checkbox to listen and click Create. Select the policy name and copy Primary or Secondary Connection String.

How do I check messages on Azure Service Bus queue?

To peek messages, select Peek Mode in the Service Bus Explorer dropdown. Check the metrics to see if there are Active Messages or Dead-lettered Messages to peek and select either Queue / Subscription or DeadLetter sub-queue. Select the Peek from start button.


1 Answers

By default receiver works in auto-settle mode, you have to change it to settle on disposition:

const { Constants } = require('amqp10')

// 
// ...create client, connect, etc...
//

// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
  attach: {
    rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
  }
})

Do not forget to accept/reject/release a message after processing it:

receiver.on('message', msg => {
  //
  // ...do something smart with a message...
  //

  receiver.accept(msg) // <- manually settle a message
})
like image 110
qzb Avatar answered Oct 06 '22 04:10

qzb