From 60700eebcac6f1ce985a34da59f216481473780a Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Mon, 24 Oct 2022 13:31:59 +0200 Subject: [PATCH] fix: error thrown when queue deleted in amqplib 0.10.0 Fixes #301 --- src/ChannelWrapper.ts | 4 ++-- test/integrationTest.ts | 29 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index f0c26ea6..ba28e353 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -759,7 +759,7 @@ export default class ChannelWrapper extends EventEmitter { if (!msg) { consumer.consumerTag = null; this._reconnectConsumer(consumer).catch((err) => { - if (err.isOperational && err.message.includes('BasicConsume; 404')) { + if (err.code === 404) { // Ignore errors caused by queue not declared. In // those cases the connection will reconnect and // then consumers reestablished. The full reconnect @@ -767,7 +767,7 @@ export default class ChannelWrapper extends EventEmitter { // before starting to consume. return; } - throw err; + this.emit('error', err); }); return; } diff --git a/test/integrationTest.ts b/test/integrationTest.ts index 4032b20d..1cbf6de3 100644 --- a/test/integrationTest.ts +++ b/test/integrationTest.ts @@ -298,4 +298,33 @@ describe('Integration tests', () => { await rpcClient.close(); await rpcServer.close(); }); + + it('should reconnect consumer after queue deletion', async function () { + const queueName = 'testQueue'; + + connection = new AmqpConnectionManager('amqp://localhost', { reconnectTimeInSeconds: 0.5 }); + const channelWrapper = connection.createChannel({ + confirm: true, + setup: async (channel: Channel) => { + await channel.assertQueue(queueName, { durable: false, autoDelete: true }); + }, + }); + + const result = defer(); + await channelWrapper.consume(queueName, (msg) => { + result.resolve(msg.content.toString()); + }); + + await Promise.all([connection.connect(), once(channelWrapper, 'connect')]); + + // The deleted queue should cause a reconnect + await channelWrapper.deleteQueue(queueName); + + // Await all setup functions to run before sending a message + await once(channelWrapper, 'connect'); + await channelWrapper.sendToQueue(queueName, 'hello'); + + const content = await result.promise; + expect(content).to.equal('hello'); + }); });