Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dead letter queues #411

Merged
merged 2 commits into from Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 53 additions & 26 deletions packages/queues/src/broker.ts
Expand Up @@ -18,8 +18,9 @@ export type QueueErrorCode = "ERR_CONSUMER_ALREADY_SET";

export class QueueError extends MiniflareError<QueueErrorCode> {}

export const MAX_ATTEMPTS = 3;
const kShouldAttemptRetry = Symbol("kShouldAttemptRetry");
const kGetPendingRetry = Symbol("kGetPendingRetry");
const kPrepareForRetry = Symbol("kPrepareForRetry");
const kGetFailedAttempts = Symbol("kGetFailedAttempts");

export class Message<Body = unknown> implements MessageInterface<Body> {
readonly body: Body;
Expand Down Expand Up @@ -48,24 +49,17 @@ export class Message<Body = unknown> implements MessageInterface<Body> {
this.#pendingRetry = true;
}

[kShouldAttemptRetry](): boolean {
if (!this.#pendingRetry) {
return false;
}

[kPrepareForRetry]() {
this.#pendingRetry = false;
this.#failedAttempts++;
if (this.#failedAttempts >= MAX_ATTEMPTS) {
this.#log?.warn(
`Dropped message "${this.id}" after ${
this.#failedAttempts
} failed attempts!`
);
return false;
}
}

this.#log?.debug(`Retrying message "${this.id}"...`);
this.#pendingRetry = false;
return true;
[kGetPendingRetry](): boolean {
return this.#pendingRetry;
}

[kGetFailedAttempts](): number {
return this.#failedAttempts;
}
}

Expand Down Expand Up @@ -96,6 +90,7 @@ enum FlushType {
export const kSetFlushCallback = Symbol("kSetFlushCallback");

export class Queue<Body = unknown> implements QueueInterface<Body> {
readonly #broker: QueueBroker;
readonly #queueName: string;
readonly #log?: Log;

Expand All @@ -109,7 +104,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
// A callback to run after a flush() has been executed: useful for testing.
#flushCallback?: () => void;

constructor(queueName: string, log?: Log) {
constructor(broker: QueueBroker, queueName: string, log?: Log) {
this.#broker = broker;
this.#queueName = queueName;
this.#log = log;

Expand Down Expand Up @@ -202,6 +198,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
if (!this.#consumer) {
return;
}
const maxAttempts = this.#consumer.maxRetries + 1;
const deadLetterQueue = this.#consumer.deadLetterQueue;

// Create a batch and execute the queue event handler
const batch = new MessageBatch<Body>(this.#queueName, [...this.#messages]);
Expand All @@ -216,12 +214,41 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
// Reset state and check for any messages to retry
this.#pendingFlush = FlushType.NONE;
this.#timeout = undefined;
const messagesToRetry = batch.messages.filter((msg) =>
msg[kShouldAttemptRetry]()
);
this.#messages.push(...messagesToRetry);
if (this.#messages.length > 0) {
this.#ensurePendingFlush();

const toRetry: Message<Body>[] = [];
const toDLQ: Message<Body>[] = [];
batch.messages.forEach((msg) => {
if (!msg[kGetPendingRetry]()) {
return;
}

msg[kPrepareForRetry]();
if (msg[kGetFailedAttempts]() < maxAttempts) {
this.#log?.debug(`Retrying message "${msg.id}"...`);
toRetry.push(msg);
} else if (deadLetterQueue) {
this.#log?.warn(
`Moving message "${msg.id}" to dead letter queue "${deadLetterQueue}"...`
);
toDLQ.push(msg);
} else {
this.#log?.warn(
`Dropped message "${msg.id}" after ${maxAttempts} failed attempts!`
);
}
});

if (toRetry.length) {
this.#messages.push(...toRetry);
if (this.#messages.length > 0) {
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
this.#ensurePendingFlush();
}
}

if (deadLetterQueue) {
toDLQ.forEach((msg) => {
this.#broker.getOrCreateQueue(deadLetterQueue).send(msg.body);
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
});
}
}

Expand All @@ -242,7 +269,7 @@ export class QueueBroker implements QueueBrokerInterface {
getOrCreateQueue(name: string): Queue {
let queue = this.#queues.get(name);
if (queue === undefined) {
this.#queues.set(name, (queue = new Queue(name, this.#log)));
this.#queues.set(name, (queue = new Queue(this, name, this.#log)));
}
return queue;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/queues/src/plugin.ts
Expand Up @@ -11,6 +11,7 @@ import {

export const DEFAULT_BATCH_SIZE = 5;
export const DEFAULT_WAIT_MS = 1000;
export const DEFAULT_RETRIES = 2;

export interface BindingOptions {
name: string;
Expand Down Expand Up @@ -106,6 +107,8 @@ export class QueuesPlugin
queueName: opts.queueName,
maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE,
maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS,
maxRetries: opts.maxRetries ?? DEFAULT_RETRIES,
deadLetterQueue: opts.deadLetterQueue,
dispatcher: this.ctx.queueEventDispatcher,
};

Expand Down
78 changes: 71 additions & 7 deletions packages/queues/test/broker.spec.ts
@@ -1,8 +1,4 @@
import {
MAX_ATTEMPTS,
QueueBroker,
kSetFlushCallback,
} from "@miniflare/queues";
import { QueueBroker, kSetFlushCallback } from "@miniflare/queues";
import {
Consumer,
LogLevel,
Expand All @@ -19,6 +15,7 @@ test("QueueBroker: flushes partial batches", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -109,6 +106,7 @@ test("QueueBroker: flushes full batches", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -193,6 +191,7 @@ test("QueueBroker: supports message retry()", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -241,6 +240,7 @@ test("QueueBroker: automatic retryAll() on consumer error", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -299,6 +299,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 4,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand All @@ -312,7 +313,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
// Expect the queue to flush() the maximum number of times
q.send("message1");

for (let i = 0; i < MAX_ATTEMPTS; i++) {
for (let i = 0; i < 5; i++) {
const prom = new Promise<void>((resolve) => {
q[kSetFlushCallback](() => resolve());
});
Expand All @@ -323,7 +324,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
// Check last log message is warning that message dropped
t.deepEqual(log.logs[log.logs.length - 1], [
LogLevel.WARN,
'Dropped message "myQueue-0" after 3 failed attempts!',
'Dropped message "myQueue-0" after 5 failed attempts!',
]);

// To check that "message1" is dropped:
Expand All @@ -338,3 +339,66 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
});
await prom;
});

test("QueueBroker: dead letter queue support", async (t) => {
const log = new TestLog();
log.error = (message) =>
log.logWithLevel(LogLevel.ERROR, message?.stack ?? "");

const broker = new QueueBroker(log);

// Setup the original queue
const q = broker.getOrCreateQueue("myQueue");
const originalConsumer: Consumer = {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 1,
deadLetterQueue: "myDLQ",
dispatcher: async (_batch) => {},
};
q[kSetConsumer](originalConsumer);

const dlq = broker.getOrCreateQueue("myDLQ");
const dlqConsumer: Consumer = {
queueName: "myDLQ",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 0,
dispatcher: async (_batch) => {},
};
dlq[kSetConsumer](dlqConsumer);

// Set up the consumer for the original queue
let originalInvocations = 0;
originalConsumer.dispatcher = async (batch: MessageBatch) => {
batch.messages[0].retry();
originalInvocations++;
};

// Set up the consumer for the dead letter queue
let dlqInvocations = 0;
dlqConsumer.dispatcher = async (_batch: MessageBatch) => {
dlqInvocations++;
};

const originalQProm = new Promise<void>((resolve) => {
q[kSetFlushCallback](() => resolve());
});
q.send("message1");
await originalQProm;

const dlqProm = new Promise<void>((resolve) => {
dlq[kSetFlushCallback](() => resolve());
});
await dlqProm;

t.deepEqual(originalInvocations, 2);
t.deepEqual(dlqInvocations, 1);

// Check last log message is warning that message dropped
t.deepEqual(log.logs[log.logs.length - 1], [
LogLevel.WARN,
'Moving message "myQueue-0" to dead letter queue "myDLQ"...',
]);
});
2 changes: 2 additions & 0 deletions packages/shared/src/queues.ts
Expand Up @@ -15,6 +15,8 @@ export interface Consumer {
queueName: string;
maxBatchSize: number;
maxWaitMs: number;
maxRetries: number;
deadLetterQueue?: string;
dispatcher: QueueEventDispatcher;
}

Expand Down