We're trying to consume Azure Service Bus in a Node application. Our requirement is to fetch multiple messages from a queue.
Since Azure SDK for Node doesn't support batch retrieval, we decided to use AMQP. While we're able to fetch messages using Peek Messages as described here (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations).
What we are noticing is that as soon as messages are fetched, they are getting removed from the queue. I am wondering if anyone has an insight into how we can fetch messages in "PeekLock" mode using AMQP and Node. For AMQP, we're using amqp10 node package (https://www.npmjs.com/package/amqp10).
Here's our code for peeking at the messages:
const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;
const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
return Promise.all([
client.createReceiver(queueName),
client.createSender(queueName)
]);
})
.spread(function(receiver, sender) {
console.log(receiver);
console.log(sender);
console.log('--------------------------------------------------------------------------');
receiver.on('errorReceived', function(err) {
// check for errors
console.log(err);
});
receiver.on('message', function(message) {
console.log('Received message');
console.log(message);
console.log('------------------------------------');
});
return sender.send([], {
operation: 'com.microsoft:peek-message',
'message-count': 5
});
})
.error(function (e) {
console.warn('connection error: ', e);
});
The Azure Service Bus cloud service uses the AMQP 1.0 as its primary means of communication.
Open Queue. Select Service Bus Explorer (preview) under Settings. Write policy name and select the checkbox to listen and click Create. Select the policy name and copy Primary or Secondary Connection String.
To peek messages, select Peek Mode in the Service Bus Explorer dropdown. Check the metrics to see if there are Active Messages or Dead-lettered Messages to peek and select either Queue / Subscription or DeadLetter sub-queue. Select the Peek from start button.
By default receiver works in auto-settle mode, you have to change it to settle on disposition:
const { Constants } = require('amqp10')
//
// ...create client, connect, etc...
//
// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
attach: {
rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
}
})
Do not forget to accept/reject/release a message after processing it:
receiver.on('message', msg => {
//
// ...do something smart with a message...
//
receiver.accept(msg) // <- manually settle a message
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With