Comment récupérer des messages d’une queue Azure Service Bus en mode «PeekLock» à l’aide d’AMQP?

Nous essayons de consumr Azure Service Bus dans une application de nœud. Notre exigence est d’extraire plusieurs messages d’une queue .

Azure SDK for Node ne prenant pas en charge la récupération par lots, nous avons décidé d’utiliser AMQP. Bien que nous puissions récupérer les messages à l’aide de Peek Messages comme décrit ici ( https://docs.microsoft.com/en-us/azuree/service-bus-messaging/service-bus-amqp-request-response#message-operations )

Ce que nous remarquons, c’est que dès que les messages sont récupérés, ils sont retirés de la queue. Je me demande si quelqu’un a une idée de la façon dont nous pouvons récupérer des messages en mode “PeekLock” en utilisant AMQP et Node. Pour AMQP, nous utilisons le package de nœud amqp10 ( https://www.npmjs.com/package/amqp10 ).

Voici notre code pour regarder les messages:

const AMQPClient = require('amqp10/lib').Client, Policy = require('amqp10/lib').Policy; const protocol = 'amqps'; const keyName = 'RootManageSharedAccessKey'; const sasKey = 'My Shared Access Key' const serviceBusHost = 'account-name.servicebus.windows.net'; const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost; const queueName = 'test1'; var client = new AMQPClient(Policy.ServiceBusQueue); client.connect(uri) .then(function () { return Promise.all([ client.createReceiver(queueName), client.createSender(queueName) ]); }) .spread(function(receiver, sender) { console.log(receiver); console.log(sender); console.log('--------------------------------------------------------------------------'); receiver.on('errorReceived', function(err) { // check for errors console.log(err); }); receiver.on('message', function(message) { console.log('Received message'); console.log(message); console.log('------------------------------------'); }); return sender.send([], { operation: 'com.microsoft:peek-message', 'message-count': 5 }); }) .error(function (e) { console.warn('connection error: ', e); }); 

Par défaut, le récepteur fonctionne en mode de règlement automatique, vous devez le modifier pour régler la disposition :

 const { Constants } = require('amqp10') // // ...create client, connect, etc... // // Second parameter of createReceiver method enables overwriting policy parameters const receiver = client.createReceiver(queueName, { attach: { rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition } }) 

N’oubliez pas d’accepter / rejeter / libérer un message après l’avoir traité:

 receiver.on('message', msg => { // // ...do something smart with a message... // receiver.accept(msg) // <- manually settle a message })