diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index f0c26ea..ba28e35 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -759,7 +759,7 @@ export default class ChannelWrapper extends EventEmitter { if (!msg) { consumer.consumerTag = null; this._reconnectConsumer(consumer).catch((err) => { - if (err.isOperational && err.message.includes('BasicConsume; 404')) { + if (err.code === 404) { // Ignore errors caused by queue not declared. In // those cases the connection will reconnect and // then consumers reestablished. The full reconnect @@ -767,7 +767,7 @@ export default class ChannelWrapper extends EventEmitter { // before starting to consume. return; } - throw err; + this.emit('error', err); }); return; } diff --git a/test/integrationTest.ts b/test/integrationTest.ts index 4032b20..1cbf6de 100644 --- a/test/integrationTest.ts +++ b/test/integrationTest.ts @@ -298,4 +298,33 @@ describe('Integration tests', () => { await rpcClient.close(); await rpcServer.close(); }); + + it('should reconnect consumer after queue deletion', async function () { + const queueName = 'testQueue'; + + connection = new AmqpConnectionManager('amqp://localhost', { reconnectTimeInSeconds: 0.5 }); + const channelWrapper = connection.createChannel({ + confirm: true, + setup: async (channel: Channel) => { + await channel.assertQueue(queueName, { durable: false, autoDelete: true }); + }, + }); + + const result = defer(); + await channelWrapper.consume(queueName, (msg) => { + result.resolve(msg.content.toString()); + }); + + await Promise.all([connection.connect(), once(channelWrapper, 'connect')]); + + // The deleted queue should cause a reconnect + await channelWrapper.deleteQueue(queueName); + + // Await all setup functions to run before sending a message + await once(channelWrapper, 'connect'); + await channelWrapper.sendToQueue(queueName, 'hello'); + + const content = await result.promise; + expect(content).to.equal('hello'); + }); });