Skip to content

Commit

Permalink
Merge pull request #303 from luddd3/fix-reconnect-consumer
Browse files Browse the repository at this point in the history
fix: error thrown when queue deleted in amqplib 0.10.0
  • Loading branch information
jwalton committed Oct 24, 2022
2 parents 6aa1783 + 60700ee commit 7a430ef
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/ChannelWrapper.ts
Expand Up @@ -759,15 +759,15 @@ export default class ChannelWrapper extends EventEmitter {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
if (err.code === 404) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
this.emit('error', err);
});
return;
}
Expand Down
29 changes: 29 additions & 0 deletions test/integrationTest.ts
Expand Up @@ -298,4 +298,33 @@ describe('Integration tests', () => {
await rpcClient.close();
await rpcServer.close();
});

it('should reconnect consumer after queue deletion', async function () {
const queueName = 'testQueue';

connection = new AmqpConnectionManager('amqp://localhost', { reconnectTimeInSeconds: 0.5 });
const channelWrapper = connection.createChannel({
confirm: true,
setup: async (channel: Channel) => {
await channel.assertQueue(queueName, { durable: false, autoDelete: true });
},
});

const result = defer<string>();
await channelWrapper.consume(queueName, (msg) => {
result.resolve(msg.content.toString());
});

await Promise.all([connection.connect(), once(channelWrapper, 'connect')]);

// The deleted queue should cause a reconnect
await channelWrapper.deleteQueue(queueName);

// Await all setup functions to run before sending a message
await once(channelWrapper, 'connect');
await channelWrapper.sendToQueue(queueName, 'hello');

const content = await result.promise;
expect(content).to.equal('hello');
});
});

0 comments on commit 7a430ef

Please sign in to comment.