From 73ae294942284ba1c47b387197e699f621d325bb Mon Sep 17 00:00:00 2001 From: Josh Wheeler Date: Wed, 12 Oct 2022 15:01:25 -0500 Subject: [PATCH 1/2] Add support for dead letter queues --- packages/queues/src/broker.ts | 79 +++++++++++++++++++---------- packages/queues/src/plugin.ts | 3 ++ packages/queues/test/broker.spec.ts | 78 +++++++++++++++++++++++++--- packages/shared/src/queues.ts | 2 + 4 files changed, 129 insertions(+), 33 deletions(-) diff --git a/packages/queues/src/broker.ts b/packages/queues/src/broker.ts index 35fcfcab6..69c658624 100644 --- a/packages/queues/src/broker.ts +++ b/packages/queues/src/broker.ts @@ -18,8 +18,9 @@ export type QueueErrorCode = "ERR_CONSUMER_ALREADY_SET"; export class QueueError extends MiniflareError {} -export const MAX_ATTEMPTS = 3; -const kShouldAttemptRetry = Symbol("kShouldAttemptRetry"); +const kGetPendingRetry = Symbol("kGetPendingRetry"); +const kPrepareForRetry = Symbol("kPrepareForRetry"); +const kGetFailedAttempts = Symbol("kGetFailedAttempts"); export class Message implements MessageInterface { readonly body: Body; @@ -48,24 +49,17 @@ export class Message implements MessageInterface { this.#pendingRetry = true; } - [kShouldAttemptRetry](): boolean { - if (!this.#pendingRetry) { - return false; - } - + [kPrepareForRetry]() { + this.#pendingRetry = false; this.#failedAttempts++; - if (this.#failedAttempts >= MAX_ATTEMPTS) { - this.#log?.warn( - `Dropped message "${this.id}" after ${ - this.#failedAttempts - } failed attempts!` - ); - return false; - } + } - this.#log?.debug(`Retrying message "${this.id}"...`); - this.#pendingRetry = false; - return true; + [kGetPendingRetry](): boolean { + return this.#pendingRetry; + } + + [kGetFailedAttempts](): number { + return this.#failedAttempts; } } @@ -96,6 +90,7 @@ enum FlushType { export const kSetFlushCallback = Symbol("kSetFlushCallback"); export class Queue implements QueueInterface { + readonly #broker: QueueBroker; readonly #queueName: string; readonly #log?: Log; @@ -109,7 +104,8 @@ export class Queue implements QueueInterface { // A callback to run after a flush() has been executed: useful for testing. #flushCallback?: () => void; - constructor(queueName: string, log?: Log) { + constructor(broker: QueueBroker, queueName: string, log?: Log) { + this.#broker = broker; this.#queueName = queueName; this.#log = log; @@ -202,6 +198,8 @@ export class Queue implements QueueInterface { if (!this.#consumer) { return; } + const maxAttempts = this.#consumer.maxRetries + 1; + const deadLetterQueue = this.#consumer.deadLetterQueue; // Create a batch and execute the queue event handler const batch = new MessageBatch(this.#queueName, [...this.#messages]); @@ -216,12 +214,41 @@ export class Queue implements QueueInterface { // Reset state and check for any messages to retry this.#pendingFlush = FlushType.NONE; this.#timeout = undefined; - const messagesToRetry = batch.messages.filter((msg) => - msg[kShouldAttemptRetry]() - ); - this.#messages.push(...messagesToRetry); - if (this.#messages.length > 0) { - this.#ensurePendingFlush(); + + const toRetry: Message[] = []; + const toDLQ: Message[] = []; + batch.messages.forEach((msg) => { + if (!msg[kGetPendingRetry]()) { + return; + } + + msg[kPrepareForRetry](); + if (msg[kGetFailedAttempts]() < maxAttempts) { + this.#log?.debug(`Retrying message "${msg.id}"...`); + toRetry.push(msg); + } else if (deadLetterQueue) { + this.#log?.warn( + `Moving message "${msg.id}" to dead letter queue "${deadLetterQueue}"...` + ); + toDLQ.push(msg); + } else { + this.#log?.warn( + `Dropped message "${msg.id}" after ${maxAttempts} failed attempts!` + ); + } + }); + + if (toRetry.length) { + this.#messages.push(...toRetry); + if (this.#messages.length > 0) { + this.#ensurePendingFlush(); + } + } + + if (deadLetterQueue) { + toDLQ.forEach((msg) => { + this.#broker.getOrCreateQueue(deadLetterQueue).send(msg.body); + }); } } @@ -242,7 +269,7 @@ export class QueueBroker implements QueueBrokerInterface { getOrCreateQueue(name: string): Queue { let queue = this.#queues.get(name); if (queue === undefined) { - this.#queues.set(name, (queue = new Queue(name, this.#log))); + this.#queues.set(name, (queue = new Queue(this, name, this.#log))); } return queue; } diff --git a/packages/queues/src/plugin.ts b/packages/queues/src/plugin.ts index fdb3e96dd..ad05dab8b 100644 --- a/packages/queues/src/plugin.ts +++ b/packages/queues/src/plugin.ts @@ -11,6 +11,7 @@ import { export const DEFAULT_BATCH_SIZE = 5; export const DEFAULT_WAIT_MS = 1000; +export const DEFAULT_RETRIES = 2; export interface BindingOptions { name: string; @@ -106,6 +107,8 @@ export class QueuesPlugin queueName: opts.queueName, maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE, maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS, + maxRetries: opts.maxRetries ?? DEFAULT_RETRIES, + deadLetterQueue: opts.deadLetterQueue, dispatcher: this.ctx.queueEventDispatcher, }; diff --git a/packages/queues/test/broker.spec.ts b/packages/queues/test/broker.spec.ts index c3a6778b2..12ba5b62d 100644 --- a/packages/queues/test/broker.spec.ts +++ b/packages/queues/test/broker.spec.ts @@ -1,8 +1,4 @@ -import { - MAX_ATTEMPTS, - QueueBroker, - kSetFlushCallback, -} from "@miniflare/queues"; +import { QueueBroker, kSetFlushCallback } from "@miniflare/queues"; import { Consumer, LogLevel, @@ -19,6 +15,7 @@ test("QueueBroker: flushes partial batches", async (t) => { queueName: "myQueue", maxBatchSize: 5, maxWaitMs: 1, + maxRetries: 2, dispatcher: async (_batch) => {}, }; q[kSetConsumer](sub); @@ -109,6 +106,7 @@ test("QueueBroker: flushes full batches", async (t) => { queueName: "myQueue", maxBatchSize: 5, maxWaitMs: 1, + maxRetries: 2, dispatcher: async (_batch) => {}, }; q[kSetConsumer](sub); @@ -193,6 +191,7 @@ test("QueueBroker: supports message retry()", async (t) => { queueName: "myQueue", maxBatchSize: 5, maxWaitMs: 1, + maxRetries: 2, dispatcher: async (_batch) => {}, }; q[kSetConsumer](sub); @@ -241,6 +240,7 @@ test("QueueBroker: automatic retryAll() on consumer error", async (t) => { queueName: "myQueue", maxBatchSize: 5, maxWaitMs: 1, + maxRetries: 2, dispatcher: async (_batch) => {}, }; q[kSetConsumer](sub); @@ -299,6 +299,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => { queueName: "myQueue", maxBatchSize: 5, maxWaitMs: 1, + maxRetries: 4, dispatcher: async (_batch) => {}, }; q[kSetConsumer](sub); @@ -312,7 +313,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => { // Expect the queue to flush() the maximum number of times q.send("message1"); - for (let i = 0; i < MAX_ATTEMPTS; i++) { + for (let i = 0; i < 5; i++) { const prom = new Promise((resolve) => { q[kSetFlushCallback](() => resolve()); }); @@ -323,7 +324,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => { // Check last log message is warning that message dropped t.deepEqual(log.logs[log.logs.length - 1], [ LogLevel.WARN, - 'Dropped message "myQueue-0" after 3 failed attempts!', + 'Dropped message "myQueue-0" after 5 failed attempts!', ]); // To check that "message1" is dropped: @@ -338,3 +339,66 @@ test("QueueBroker: drops messages after max retry()", async (t) => { }); await prom; }); + +test("QueueBroker: dead letter queue support", async (t) => { + const log = new TestLog(); + log.error = (message) => + log.logWithLevel(LogLevel.ERROR, message?.stack ?? ""); + + const broker = new QueueBroker(log); + + // Setup the original queue + const q = broker.getOrCreateQueue("myQueue"); + const originalConsumer: Consumer = { + queueName: "myQueue", + maxBatchSize: 5, + maxWaitMs: 1, + maxRetries: 1, + deadLetterQueue: "myDLQ", + dispatcher: async (_batch) => {}, + }; + q[kSetConsumer](originalConsumer); + + const dlq = broker.getOrCreateQueue("myDLQ"); + const dlqConsumer: Consumer = { + queueName: "myDLQ", + maxBatchSize: 5, + maxWaitMs: 1, + maxRetries: 0, + dispatcher: async (_batch) => {}, + }; + dlq[kSetConsumer](dlqConsumer); + + // Set up the consumer for the original queue + let originalInvocations = 0; + originalConsumer.dispatcher = async (batch: MessageBatch) => { + batch.messages[0].retry(); + originalInvocations++; + }; + + // Set up the consumer for the dead letter queue + let dlqInvocations = 0; + dlqConsumer.dispatcher = async (_batch: MessageBatch) => { + dlqInvocations++; + }; + + const originalQProm = new Promise((resolve) => { + q[kSetFlushCallback](() => resolve()); + }); + q.send("message1"); + await originalQProm; + + const dlqProm = new Promise((resolve) => { + dlq[kSetFlushCallback](() => resolve()); + }); + await dlqProm; + + t.deepEqual(originalInvocations, 2); + t.deepEqual(dlqInvocations, 1); + + // Check last log message is warning that message dropped + t.deepEqual(log.logs[log.logs.length - 1], [ + LogLevel.WARN, + 'Moving message "myQueue-0" to dead letter queue "myDLQ"...', + ]); +}); diff --git a/packages/shared/src/queues.ts b/packages/shared/src/queues.ts index 187cc69b8..52db478a3 100644 --- a/packages/shared/src/queues.ts +++ b/packages/shared/src/queues.ts @@ -15,6 +15,8 @@ export interface Consumer { queueName: string; maxBatchSize: number; maxWaitMs: number; + maxRetries: number; + deadLetterQueue?: string; dispatcher: QueueEventDispatcher; } From 36cd45ab5b0eb0603993e784ab83df21826d69b9 Mon Sep 17 00:00:00 2001 From: Josh Wheeler Date: Fri, 14 Oct 2022 10:21:36 -0500 Subject: [PATCH 2/2] fixup! Add support for dead letter queues --- packages/queues/src/broker.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/queues/src/broker.ts b/packages/queues/src/broker.ts index 69c658624..739264b25 100644 --- a/packages/queues/src/broker.ts +++ b/packages/queues/src/broker.ts @@ -199,7 +199,7 @@ export class Queue implements QueueInterface { return; } const maxAttempts = this.#consumer.maxRetries + 1; - const deadLetterQueue = this.#consumer.deadLetterQueue; + const deadLetterQueueName = this.#consumer.deadLetterQueue; // Create a batch and execute the queue event handler const batch = new MessageBatch(this.#queueName, [...this.#messages]); @@ -226,9 +226,9 @@ export class Queue implements QueueInterface { if (msg[kGetFailedAttempts]() < maxAttempts) { this.#log?.debug(`Retrying message "${msg.id}"...`); toRetry.push(msg); - } else if (deadLetterQueue) { + } else if (deadLetterQueueName) { this.#log?.warn( - `Moving message "${msg.id}" to dead letter queue "${deadLetterQueue}"...` + `Moving message "${msg.id}" to dead letter queue "${deadLetterQueueName}"...` ); toDLQ.push(msg); } else { @@ -240,14 +240,14 @@ export class Queue implements QueueInterface { if (toRetry.length) { this.#messages.push(...toRetry); - if (this.#messages.length > 0) { - this.#ensurePendingFlush(); - } + this.#ensurePendingFlush(); } - if (deadLetterQueue) { + if (deadLetterQueueName) { + const deadLetterQueue = + this.#broker.getOrCreateQueue(deadLetterQueueName); toDLQ.forEach((msg) => { - this.#broker.getOrCreateQueue(deadLetterQueue).send(msg.body); + deadLetterQueue.send(msg.body); }); } }