From 42dfd0c3f95fc680ebad9df691bde75f2b485cfc Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 1 Nov 2021 12:00:01 -0700 Subject: [PATCH] Implement new trigger format (#3872) * Introduce new trigger type * Implement task queue triggers. 1. Renamed the retry policy from last change to match the spec. 2. We do not delete queues because they cannot be recreated for 4 days. We may eventually need a utility to delete all disabled queues. We cannot make such a utility only clean up our queues because Cloud Tasks does not support labels. --- src/api.js | 4 + src/deploy/functions/backend.ts | 4 +- src/deploy/functions/release/fabricator.ts | 87 +++++-- src/deploy/functions/release/planner.ts | 2 + src/deploy/functions/release/reporter.ts | 2 +- .../functions/runtimes/discovery/v1alpha1.ts | 6 +- .../functions/runtimes/node/parseTriggers.ts | 2 +- src/gcp/cloudtasks.ts | 237 ++++++++++++++++++ src/gcp/proto.ts | 4 +- .../functions/release/fabricator.spec.ts | 107 +++++++- .../runtimes/discovery/v1alpha1.spec.ts | 12 +- src/test/gcp/cloudtasks.spec.ts | 218 ++++++++++++++++ 12 files changed, 645 insertions(+), 40 deletions(-) create mode 100644 src/gcp/cloudtasks.ts create mode 100644 src/test/gcp/cloudtasks.spec.ts diff --git a/src/api.js b/src/api.js index 63743d6998d..8e9424b55f3 100644 --- a/src/api.js +++ b/src/api.js @@ -155,6 +155,10 @@ var api = { "FIREBASE_CLOUDSCHEDULER_URL", "https://cloudscheduler.googleapis.com" ), + cloudTasksOrigin: utils.envOverride( + "FIREBASE_CLOUD_TAKS_URL", + "https://cloudtasks.googleapis.com" + ), pubsubOrigin: utils.envOverride("FIREBASE_PUBSUB_URL", "https://pubsub.googleapis.com"), googleOrigin: utils.envOverride( "FIREBASE_TOKEN_URL", diff --git a/src/deploy/functions/backend.ts b/src/deploy/functions/backend.ts index 5ee8348a4ca..f26a7e612b0 100644 --- a/src/deploy/functions/backend.ts +++ b/src/deploy/functions/backend.ts @@ -95,7 +95,7 @@ export interface TaskQueueRateLimits { maxDispatchesPerSecond?: number; } -export interface TaskQueueRetryPolicy { +export interface TaskQueueRetryConfig { maxAttempts?: number; maxRetryDuration?: proto.Duration; minBackoff?: proto.Duration; @@ -105,7 +105,7 @@ export interface TaskQueueRetryPolicy { export interface TaskQueueTrigger { rateLimits?: TaskQueueRateLimits; - retryPolicy?: TaskQueueRetryPolicy; + retryConfig?: TaskQueueRetryConfig; invoker?: string[]; } diff --git a/src/deploy/functions/release/fabricator.ts b/src/deploy/functions/release/fabricator.ts index 76d99793bcd..d3f46d3e036 100644 --- a/src/deploy/functions/release/fabricator.ts +++ b/src/deploy/functions/release/fabricator.ts @@ -9,6 +9,7 @@ import { getHumanFriendlyRuntimeName } from "../runtimes"; import { functionsOrigin, functionsV2Origin } from "../../../api"; import { logger } from "../../../logger"; import * as backend from "../backend"; +import * as cloudtasks from "../../../gcp/cloudtasks"; import * as deploymentTool from "../../../deploymentTool"; import * as gcf from "../../../gcp/cloudfunctions"; import * as gcfV2 from "../../../gcp/cloudfunctionsv2"; @@ -227,6 +228,17 @@ export class Fabricator { }) .catch(rethrowAs(endpoint, "set invoker")); } + } else if (backend.isTaskQueueTriggered(endpoint)) { + // Like HTTPS triggers, taskQueueTriggers have an invoker, but unlike HTTPS they don't default + // public. + const invoker = endpoint.taskQueueTrigger.invoker; + if (invoker && !invoker.includes("private")) { + await this.executor + .run(async () => { + await gcf.setInvokerCreate(endpoint.project, backend.functionName(endpoint), invoker); + }) + .catch(rethrowAs(endpoint, "set invoker")); + } } } @@ -280,6 +292,17 @@ export class Fabricator { .run(() => run.setInvokerCreate(endpoint.project, serviceName, invoker)) .catch(rethrowAs(endpoint, "set invoker")); } + } else if (backend.isTaskQueueTriggered(endpoint)) { + // Like HTTPS triggers, taskQueueTriggers have an invoker, but unlike HTTPS they don't default + // public. + const invoker = endpoint.taskQueueTrigger.invoker; + if (invoker && !invoker.includes("private")) { + await this.executor + .run(async () => { + await gcf.setInvokerCreate(endpoint.project, backend.functionName(endpoint), invoker); + }) + .catch(rethrowAs(endpoint, "set invoker")); + } } await this.setConcurrency( @@ -309,16 +332,15 @@ export class Fabricator { .catch(rethrowAs(endpoint, "update")); endpoint.uri = resultFunction?.httpsTrigger?.url; - if (backend.isHttpsTriggered(endpoint) && endpoint.httpsTrigger.invoker) { + let invoker: string[] | undefined; + if (backend.isHttpsTriggered(endpoint)) { + invoker = endpoint.httpsTrigger.invoker; + } else if (backend.isTaskQueueTriggered(endpoint)) { + invoker = endpoint.taskQueueTrigger.invoker; + } + if (invoker) { await this.executor - .run(async () => { - await gcf.setInvokerUpdate( - endpoint.project, - backend.functionName(endpoint), - endpoint.httpsTrigger.invoker! - ); - return; - }) + .run(() => gcf.setInvokerUpdate(endpoint.project, backend.functionName(endpoint), invoker!)) .catch(rethrowAs(endpoint, "set invoker")); } } @@ -338,7 +360,7 @@ export class Fabricator { delete apiFunction.eventTrigger.pubsubTopic; } - const resultFunction = (await this.functionExecutor + const resultFunction = await this.functionExecutor .run(async () => { const op: { name: string } = await gcfV2.updateFunction(apiFunction); return await poller.pollOperation({ @@ -347,15 +369,19 @@ export class Fabricator { operationResourceName: op.name, }); }) - .catch(rethrowAs(endpoint, "update"))) as gcfV2.CloudFunction; + .catch(rethrowAs(endpoint, "update")); endpoint.uri = resultFunction.serviceConfig.uri; const serviceName = resultFunction.serviceConfig.service!; - if (backend.isHttpsTriggered(endpoint) && endpoint.httpsTrigger.invoker) { + let invoker: string[] | undefined; + if (backend.isHttpsTriggered(endpoint)) { + invoker = endpoint.httpsTrigger.invoker; + } else if (backend.isTaskQueueTriggered(endpoint)) { + invoker = endpoint.taskQueueTrigger.invoker; + } + if (invoker) { await this.executor - .run(() => - run.setInvokerUpdate(endpoint.project, serviceName, endpoint.httpsTrigger.invoker!) - ) + .run(() => run.setInvokerUpdate(endpoint.project, serviceName, invoker!)) .catch(rethrowAs(endpoint, "set invoker")); } @@ -443,7 +469,7 @@ export class Fabricator { } assertExhaustive(endpoint.platform); } else if (backend.isTaskQueueTriggered(endpoint)) { - await this.deleteTaskQueue(endpoint); + await this.disableTaskQueue(endpoint); } } @@ -461,10 +487,19 @@ export class Fabricator { ); } - upsertTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise { - return Promise.reject( - new reporter.DeploymentError(endpoint, "upsert task queue", new Error("Not implemented")) - ); + async upsertTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise { + const queue = cloudtasks.queueFromEndpoint(endpoint); + await this.executor + .run(() => cloudtasks.upsertQueue(queue)) + .catch(rethrowAs(endpoint, "upsert task queue")); + + // Note: should we split setTrigger into createTrigger and updateTrigger so we can avoid a + // getIamPolicy on create? + if (endpoint.taskQueueTrigger.invoker) { + await this.executor + .run(() => cloudtasks.setEnqueuer(queue.name, endpoint.taskQueueTrigger.invoker!)) + .catch(rethrowAs(endpoint, "set invoker")); + } } async deleteScheduleV1(endpoint: backend.Endpoint & backend.ScheduleTriggered): Promise { @@ -484,10 +519,14 @@ export class Fabricator { ); } - deleteTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise { - return Promise.reject( - new reporter.DeploymentError(endpoint, "delete task queue", new Error("Not implemented")) - ); + async disableTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise { + const update = { + name: cloudtasks.queueNameForEndpoint(endpoint), + state: "DISABLED" as cloudtasks.State, + }; + await this.executor + .run(() => cloudtasks.updateQueue(update)) + .catch(rethrowAs(endpoint, "disable task queue")); } logOpStart(op: string, endpoint: backend.Endpoint): void { diff --git a/src/deploy/functions/release/planner.ts b/src/deploy/functions/release/planner.ts index 81b38424137..3b844e58e81 100644 --- a/src/deploy/functions/release/planner.ts +++ b/src/deploy/functions/release/planner.ts @@ -204,6 +204,8 @@ export function checkForIllegalUpdate(want: backend.Endpoint, have: backend.Endp return "a background triggered"; } else if (backend.isScheduleTriggered(e)) { return "a scheduled"; + } else if (backend.isTaskQueueTriggered(e)) { + return "a task queue"; } // Unfortunately TypeScript isn't like Scala and I can't prove to it // that all cases have been handled diff --git a/src/deploy/functions/release/reporter.ts b/src/deploy/functions/release/reporter.ts index e9afa9d3bdc..53ed63dde8b 100644 --- a/src/deploy/functions/release/reporter.ts +++ b/src/deploy/functions/release/reporter.ts @@ -24,7 +24,7 @@ export type OperationType = | "upsert schedule" | "delete schedule" | "upsert task queue" - | "delete task queue" + | "disable task queue" | "create topic" | "delete topic" | "set invoker" diff --git a/src/deploy/functions/runtimes/discovery/v1alpha1.ts b/src/deploy/functions/runtimes/discovery/v1alpha1.ts index 68ff8040314..7ddf2d9a6e5 100644 --- a/src/deploy/functions/runtimes/discovery/v1alpha1.ts +++ b/src/deploy/functions/runtimes/discovery/v1alpha1.ts @@ -148,7 +148,7 @@ function parseEndpoints( } else if (backend.isTaskQueueTriggered(ep)) { assertKeyTypes(prefix + ".taskQueueTrigger", ep.taskQueueTrigger, { rateLimits: "object", - retryPolicy: "object", + retryConfig: "object", invoker: "array", }); if (ep.taskQueueTrigger.rateLimits) { @@ -158,8 +158,8 @@ function parseEndpoints( maxDispatchesPerSecond: "number", }); } - if (ep.taskQueueTrigger.retryPolicy) { - assertKeyTypes(prefix + ".taskQueueTrigger.retryPolicy", ep.taskQueueTrigger.retryPolicy, { + if (ep.taskQueueTrigger.retryConfig) { + assertKeyTypes(prefix + ".taskQueueTrigger.retryConfig", ep.taskQueueTrigger.retryConfig, { maxAttempts: "number", maxRetryDuration: "string", minBackoff: "string", diff --git a/src/deploy/functions/runtimes/node/parseTriggers.ts b/src/deploy/functions/runtimes/node/parseTriggers.ts index 67cb2e5d7e4..fd4cc15597b 100644 --- a/src/deploy/functions/runtimes/node/parseTriggers.ts +++ b/src/deploy/functions/runtimes/node/parseTriggers.ts @@ -65,7 +65,7 @@ export interface TriggerAnnotation { maxConcurrentDispatches?: number; maxDispatchesPerSecond?: number; }; - retryPolicy?: { + retryConfig?: { maxAttempts?: number; maxRetryDuration?: proto.Duration; minBackoff?: proto.Duration; diff --git a/src/gcp/cloudtasks.ts b/src/gcp/cloudtasks.ts new file mode 100644 index 00000000000..6e48ab2587a --- /dev/null +++ b/src/gcp/cloudtasks.ts @@ -0,0 +1,237 @@ +import * as proto from "./proto"; + +import { Client } from "../apiv2"; +import { cloudTasksOrigin } from "../api"; +import * as iam from "./iam"; +import * as backend from "../deploy/functions/backend"; + +const API_VERSION = "v2"; + +const client = new Client({ + urlPrefix: cloudTasksOrigin, + auth: true, + apiVersion: API_VERSION, +}); + +export interface AppEngineRouting { + service: string; + version: string; + instance: string; + host: string; +} + +export interface RateLimits { + maxDispatchesPerSecond?: number; + maxBurstSize?: number; + maxConcurrentDispatches?: number; +} + +export interface RetryConfig { + maxAttempts?: number; + maxRetryDuration?: proto.Duration; + minBackoff?: proto.Duration; + maxBackoff?: proto.Duration; + maxDoublings?: number; +} + +export interface StackdriverLoggingConfig { + samplingRatio: number; +} + +export type State = "RUNNING" | "PAUSED" | "DISABLED"; + +export interface Queue { + name: string; + appEngienRoutingOverride?: AppEngineRouting; + rateLimits?: RateLimits; + retryConfig?: RetryConfig; + state?: State; +} + +/** + * The client-side defaults we set for a queue. + * Unlike most APIs, Cloud Tasks doesn't omit fields which + * have default values. This means when we create a queue without + * maxDoublings, for example, it will be returned as a queue with + * maxDoublings set to 16. By setting our in-memory queue to the + * server-side defaults we'll be able to more accurately see whether + * our in-memory representation matches the current state on upsert + * and avoid a PUT call. + * NOTE: we explicitly _don't_ have the same default for + * retryConfig.maxAttempts. The server-side default is effectively + * infinite, which can cause customers to have runaway bills if the + * function crashes. We settled on a Firebase default of 3 since + * infrastructure errors also count against this limit and 1-(1-99.9%)^3 + * means we'll have 9-9s reliability of invoking the customer's + * function at least once (though unfortuantely this math assumes + * failures are independent events, which is generally untrue). + */ +export const DEFAULT_SETTINGS: Omit = { + rateLimits: { + maxConcurrentDispatches: 1000, + maxBurstSize: 100, + maxDispatchesPerSecond: 500, + }, + state: "RUNNING", + retryConfig: { + maxDoublings: 16, + maxAttempts: 3, + maxBackoff: "3600s", + minBackoff: "0.100s", + }, +}; + +/** Create a Queue that matches the spec. */ +export async function createQueue(queue: Queue): Promise { + const path = queue.name.substring(0, queue.name.lastIndexOf("/")); + const res = await client.post(path, queue); + return res.body; +} + +/** Get the Queue for a given name. */ +export async function getQueue(name: string): Promise { + const res = await client.get(name); + return res.body; +} + +/** Updates a queue to match the passed parameter. */ +export async function updateQueue(queue: Partial & { name: string }): Promise { + const res = await client.patch(queue.name, queue, { + queryParams: { updateMask: proto.fieldMasks(queue).join(",") }, + }); + return res.body; +} + +/** Ensures a queue exists with the given spec. Returns true if created and false if updated/left alone. */ +export async function upsertQueue(queue: Queue): Promise { + try { + // Here and throughout we use module.exports to ensure late binding & enable stubs in unit tests. + const existing = await (module.exports.getQueue as typeof getQueue)(queue.name); + if (JSON.stringify(queue) === JSON.stringify(existing)) { + return false; + } + + if (existing.state === "DISABLED") { + await (module.exports.purgeQueue as typeof purgeQueue)(queue.name); + } + + await (module.exports.updateQueue as typeof updateQueue)(queue); + return false; + } catch (err) { + if (err?.context?.response?.statusCode === 404) { + await (module.exports.createQueue as typeof createQueue)(queue); + return true; + } + throw err; + } +} + +/** Purges all messages in a queue with a given name. */ +export async function purgeQueue(name: string): Promise { + await client.post(`${name}:purge`); +} + +/** Deletes a queue with a given name. */ +export async function deleteQueue(name: string): Promise { + await client.delete(name); +} + +/** Set the IAM policy of a given queue. */ +export async function setIamPolicy(name: string, policy: iam.Policy): Promise { + const res = await client.post<{ policy: iam.Policy }, iam.Policy>(`${name}:setIamPolicy`, { + policy, + }); + return res.body; +} + +/** Returns the IAM policy of a given queue. */ +export async function getIamPolicy(name: string): Promise { + const res = await client.post(`${name}:getIamPolicy`); + return res.body; +} + +const ENQUEUER_ROLE = "roles/cloudtasks.enqueuer"; + +/** Ensures that the invoker policy is set for a given queue. */ +export async function setEnqueuer( + name: string, + invoker: string[], + assumeEmpty: boolean = false +): Promise { + let existing: iam.Policy; + if (assumeEmpty) { + existing = { + bindings: [], + etag: "", + version: 3, + }; + } else { + existing = await (module.exports.getIamPolicy as typeof getIamPolicy)(name); + } + + const [, project] = name.split("/"); + const invokerMembers = proto.getInvokerMembers(invoker, project); + while (true) { + const policy: iam.Policy = { + bindings: existing.bindings.filter((binding) => binding.role != ENQUEUER_ROLE), + etag: existing.etag, + version: existing.version, + }; + + if (invokerMembers.length) { + policy.bindings.push({ role: ENQUEUER_ROLE, members: invokerMembers }); + } + + if (JSON.stringify(policy) === JSON.stringify(existing)) { + return; + } + + try { + await (module.exports.setIamPolicy as typeof setIamPolicy)(name, policy); + return; + } catch (err) { + // Re-fetch on conflict + if (err?.context?.response?.statusCode === 429) { + existing = await (module.exports.getIamPolicy as typeof getIamPolicy)(name); + continue; + } + throw err; + } + } +} + +/** The name of the Task Queue we will use for this endpoint. */ +export function queueNameForEndpoint( + endpoint: backend.Endpoint & backend.TaskQueueTriggered +): string { + return `projects/${endpoint.project}/locations/${endpoint.region}/queues/${endpoint.id}`; +} + +/** Creates an API type from an Endpoint type */ +export function queueFromEndpoint(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Queue { + const queue: Required = { + ...(JSON.parse(JSON.stringify(DEFAULT_SETTINGS)) as Omit, "name">), + name: queueNameForEndpoint(endpoint), + }; + if (endpoint.taskQueueTrigger.rateLimits) { + proto.copyIfPresent( + queue.rateLimits, + endpoint.taskQueueTrigger.rateLimits, + "maxBurstSize", + "maxConcurrentDispatches", + "maxDispatchesPerSecond" + ); + } + if (endpoint.taskQueueTrigger.retryConfig) { + proto.copyIfPresent( + queue.retryConfig, + endpoint.taskQueueTrigger.retryConfig, + "maxAttempts", + "maxBackoff", + "maxDoublings", + "maxRetryDuration", + "minBackoff" + ); + } + return queue; +} diff --git a/src/gcp/proto.ts b/src/gcp/proto.ts index e34030efd3c..21f4edcf50f 100644 --- a/src/gcp/proto.ts +++ b/src/gcp/proto.ts @@ -133,10 +133,10 @@ function fieldMasksHelper( * @throws {@link FirebaseError} if any invoker string is empty or not of the correct form */ export function getInvokerMembers(invoker: string[], projectId: string): string[] { - if (invoker[0] === "private") { + if (invoker.includes("private")) { return []; } - if (invoker[0] === "public") { + if (invoker.includes("public")) { return ["allUsers"]; } return invoker.map((inv) => formatServiceAccount(inv, projectId)); diff --git a/src/test/deploy/functions/release/fabricator.spec.ts b/src/test/deploy/functions/release/fabricator.spec.ts index c12b6cca274..b40c5b9fff8 100644 --- a/src/test/deploy/functions/release/fabricator.spec.ts +++ b/src/test/deploy/functions/release/fabricator.spec.ts @@ -10,6 +10,7 @@ import * as pollerNS from "../../../../operation-poller"; import * as pubsubNS from "../../../../gcp/pubsub"; import * as schedulerNS from "../../../../gcp/cloudscheduler"; import * as runNS from "../../../../gcp/run"; +import * as cloudtasksNS from "../../../../gcp/cloudtasks"; import * as backend from "../../../../deploy/functions/backend"; import * as scraper from "../../../../deploy/functions/release/sourceTokenScraper"; import * as planner from "../../../../deploy/functions/release/planner"; @@ -22,6 +23,7 @@ describe("Fabricator", () => { let pubsub: sinon.SinonStubbedInstance; let scheduler: sinon.SinonStubbedInstance; let run: sinon.SinonStubbedInstance; + let tasks: sinon.SinonStubbedInstance; beforeEach(() => { gcf = sinon.stub(gcfNS); @@ -30,10 +32,13 @@ describe("Fabricator", () => { pubsub = sinon.stub(pubsubNS); scheduler = sinon.stub(schedulerNS); run = sinon.stub(runNS); + tasks = sinon.stub(cloudtasksNS); gcf.functionFromEndpoint.restore(); gcfv2.functionFromEndpoint.restore(); scheduler.jobFromEndpoint.restore(); + tasks.queueFromEndpoint.restore(); + tasks.queueNameForEndpoint.restore(); gcf.createFunction.rejects(new Error("unexpected gcf.createFunction")); gcf.updateFunction.rejects(new Error("unexpected gcf.updateFunction")); gcf.deleteFunction.rejects(new Error("unexpected gcf.deleteFunction")); @@ -54,6 +59,13 @@ describe("Fabricator", () => { pubsub.deleteTopic.rejects(new Error("unexpected pubsub.deleteTopic")); scheduler.createOrReplaceJob.rejects(new Error("unexpected scheduler.createOrReplaceJob")); scheduler.deleteJob.rejects(new Error("unexpected scheduler.deleteJob")); + tasks.upsertQueue.rejects(new Error("unexpected tasks.upsertQueue")); + tasks.createQueue.rejects(new Error("unexpected tasks.createQueue")); + tasks.updateQueue.rejects(new Error("unexpected tasks.updateQueue")); + tasks.deleteQueue.rejects(new Error("unexpected tasks.deleteQueue")); + tasks.setEnqueuer.rejects(new Error("unexpected tasks.setEnqueuer")); + tasks.setIamPolicy.rejects(new Error("unexpected tasks.setIamPolicy")); + tasks.getIamPolicy.rejects(new Error("unexpected tasks.getIamPolicy")); }); afterEach(() => { @@ -602,6 +614,78 @@ describe("Fabricator", () => { }); }); + describe("upsertTaskQueue", () => { + it("upserts task queues", async () => { + const ep = endpoint({ + taskQueueTrigger: {}, + }) as backend.Endpoint & backend.TaskQueueTriggered; + tasks.upsertQueue.resolves(); + await fab.upsertTaskQueue(ep); + expect(tasks.upsertQueue).to.have.been.called; + expect(tasks.setEnqueuer).to.not.have.been.called; + }); + + it("sets enqueuer", async () => { + const ep = endpoint({ + taskQueueTrigger: { + invoker: ["public"], + }, + }) as backend.Endpoint & backend.TaskQueueTriggered; + tasks.upsertQueue.resolves(); + tasks.setEnqueuer.resolves(); + await fab.upsertTaskQueue(ep); + expect(tasks.upsertQueue).to.have.been.called; + expect(tasks.setEnqueuer).to.have.been.calledWithMatch(tasks.queueNameForEndpoint(ep), [ + "public", + ]); + }); + + it("wraps errors", async () => { + const ep = endpoint({ + taskQueueTrigger: { + invoker: ["public"], + }, + }) as backend.Endpoint & backend.TaskQueueTriggered; + tasks.upsertQueue.rejects(new Error("oh no")); + await expect(fab.upsertTaskQueue(ep)).to.eventually.be.rejectedWith( + reporter.DeploymentError, + "upsert task queue" + ); + + tasks.upsertQueue.resolves(); + tasks.setEnqueuer.rejects(new Error("nope")); + await expect(fab.upsertTaskQueue(ep)).to.eventually.be.rejectedWith( + reporter.DeploymentError, + "set invoker" + ); + }); + }); + + describe("disableTaskQueue", () => { + it("disables task queues", async () => { + const ep = endpoint({ + taskQueueTrigger: {}, + }) as backend.Endpoint & backend.TaskQueueTriggered; + tasks.updateQueue.resolves(); + await fab.disableTaskQueue(ep); + expect(tasks.updateQueue).to.have.been.calledWith({ + name: tasks.queueNameForEndpoint(ep), + state: "DISABLED", + }); + }); + + it("wraps errors", async () => { + const ep = endpoint({ + taskQueueTrigger: {}, + }) as backend.Endpoint & backend.TaskQueueTriggered; + tasks.updateQueue.rejects(new Error("Not today")); + await expect(fab.disableTaskQueue(ep)).to.eventually.be.rejectedWith( + reporter.DeploymentError, + "disable task queue" + ); + }); + }); + describe("setTrigger", () => { it("does nothing for HTTPS functions", async () => { // all APIs throw by default @@ -642,6 +726,17 @@ describe("Fabricator", () => { await fab.setTrigger(ep); expect(upsertScheduleV2).to.have.been.called; }); + + it("sets task queue triggers", async () => { + const ep = endpoint({ + taskQueueTrigger: {}, + }); + const upsertTaskQueue = sinon.stub(fab, "upsertTaskQueue"); + upsertTaskQueue.resolves(); + + await fab.setTrigger(ep); + expect(upsertTaskQueue).to.have.been.called; + }); }); describe("deleteTrigger", () => { @@ -664,7 +759,7 @@ describe("Fabricator", () => { await fab.deleteTrigger(ep); }); - it("sets schedule triggers", async () => { + it("deletes schedule triggers", async () => { const ep = endpoint({ scheduleTrigger: { schedule: "every 5 minutes", @@ -684,6 +779,16 @@ describe("Fabricator", () => { await fab.deleteTrigger(ep); expect(deleteScheduleV2).to.have.been.called; }); + + it("deletes task queue triggers", async () => { + const ep = endpoint({ + taskQueueTrigger: {}, + }); + const disableTaskQueue = sinon.stub(fab, "disableTaskQueue"); + + await fab.deleteTrigger(ep); + expect(disableTaskQueue).to.have.been.called; + }); }); describe("createEndpoint", () => { diff --git a/src/test/deploy/functions/runtimes/discovery/v1alpha1.spec.ts b/src/test/deploy/functions/runtimes/discovery/v1alpha1.spec.ts index 8983f537b9c..58e37020f89 100644 --- a/src/test/deploy/functions/runtimes/discovery/v1alpha1.spec.ts +++ b/src/test/deploy/functions/runtimes/discovery/v1alpha1.spec.ts @@ -210,7 +210,7 @@ describe("backendFromV1Alpha1", () => { maxConcurrentDispatches: 10, maxDispatchesPerSecond: 20, }, - retryPolicy: { + retryConfig: { maxAttempts: 3, maxRetryDuration: "120s", minBackoff: "1s", @@ -238,19 +238,19 @@ describe("backendFromV1Alpha1", () => { }); } - const invalidRetryPolicies = { + const invalidRetryConfigs = { maxAttempts: "3", maxRetryDuration: 120, minBackoff: 1, maxBackoff: 30, maxDoublings: "5", }; - for (const [key, value] of Object.entries(invalidRetryPolicies)) { - const retryPolicy = { - ...validTrigger.retryPolicy, + for (const [key, value] of Object.entries(invalidRetryConfigs)) { + const retryConfig = { + ...validTrigger.retryConfig, [key]: value, }; - const taskQueueTrigger = { ...validTrigger, retryPolicy }; + const taskQueueTrigger = { ...validTrigger, retryConfig }; assertParserError({ endpoints: { func: { ...MIN_ENDPOINT, taskQueueTrigger }, diff --git a/src/test/gcp/cloudtasks.spec.ts b/src/test/gcp/cloudtasks.spec.ts new file mode 100644 index 00000000000..81cf4a8fd33 --- /dev/null +++ b/src/test/gcp/cloudtasks.spec.ts @@ -0,0 +1,218 @@ +import { expect } from "chai"; +import * as sinon from "sinon"; + +import * as iam from "../../gcp/iam"; +import * as backend from "../../deploy/functions/backend"; +import * as cloudtasks from "../../gcp/cloudtasks"; + +describe("CloudTasks", () => { + let ct: sinon.SinonStubbedInstance; + const ENDPOINT: backend.Endpoint & backend.TaskQueueTriggered = { + platform: "gcfv2", + id: "id", + region: "region", + project: "project", + entryPoint: "id", + runtime: "nodejs16", + taskQueueTrigger: {}, + }; + + beforeEach(() => { + ct = sinon.stub(cloudtasks); + ct.queueNameForEndpoint.restore(); + ct.queueFromEndpoint.restore(); + ct.setEnqueuer.restore(); + ct.upsertQueue.restore(); + }); + + afterEach(() => { + sinon.verifyAndRestore(); + }); + + describe("queueFromEndpoint", () => { + it("handles minimal endpoints", () => { + expect(cloudtasks.queueFromEndpoint(ENDPOINT)).to.deep.equal({ + ...cloudtasks.DEFAULT_SETTINGS, + name: "projects/project/locations/region/queues/id", + }); + }); + + it("handles complex endpoints", () => { + const rateLimits: backend.TaskQueueRateLimits = { + maxBurstSize: 100, + maxConcurrentDispatches: 5, + maxDispatchesPerSecond: 5, + }; + const retryConfig: backend.TaskQueueRetryConfig = { + maxAttempts: 10, + maxBackoff: "60s", + maxDoublings: 9, + maxRetryDuration: "300s", + minBackoff: "1s", + }; + + const ep: backend.Endpoint = { + ...ENDPOINT, + taskQueueTrigger: { + rateLimits, + retryConfig, + invoker: ["robot@"], + }, + }; + expect(cloudtasks.queueFromEndpoint(ep)).to.deep.equal({ + name: "projects/project/locations/region/queues/id", + rateLimits, + retryConfig, + state: "RUNNING", + }); + }); + }); + + describe("upsertEndpoint", () => { + it("accepts a matching queue", async () => { + const queue: cloudtasks.Queue = { + name: "projects/p/locations/r/queues/f", + ...cloudtasks.DEFAULT_SETTINGS, + }; + ct.getQueue.resolves(queue); + + await cloudtasks.upsertQueue(queue); + + expect(ct.getQueue).to.have.been.called; + expect(ct.updateQueue).to.not.have.been.called; + expect(ct.purgeQueue).to.not.have.been.called; + }); + + it("updates a non-matching queue", async () => { + const wantQueue: cloudtasks.Queue = { + name: "projects/p/locations/r/queues/f", + ...cloudtasks.DEFAULT_SETTINGS, + rateLimits: { + maxBurstSize: 9_000, + }, + }; + const haveQueue: cloudtasks.Queue = { + name: "projects/p/locations/r/queues/f", + ...cloudtasks.DEFAULT_SETTINGS, + }; + ct.getQueue.resolves(haveQueue); + + await cloudtasks.upsertQueue(wantQueue); + + expect(ct.getQueue).to.have.been.called; + expect(ct.updateQueue).to.have.been.called; + expect(ct.purgeQueue).to.not.have.been.called; + }); + + it("purges a disabled queue", async () => { + const wantQueue: cloudtasks.Queue = { + name: "projects/p/locations/r/queues/f", + ...cloudtasks.DEFAULT_SETTINGS, + }; + const haveQueue: cloudtasks.Queue = { + name: "projects/p/locations/r/queues/f", + ...cloudtasks.DEFAULT_SETTINGS, + state: "DISABLED", + }; + ct.getQueue.resolves(haveQueue); + + await cloudtasks.upsertQueue(wantQueue); + + expect(ct.getQueue).to.have.been.called; + expect(ct.updateQueue).to.have.been.called; + expect(ct.purgeQueue).to.have.been.called; + }); + }); + + describe("setEnqueuer", () => { + const NAME = "projects/p/locations/r/queues/f"; + const ADMIN_BINDING: iam.Binding = { + role: "roles/cloudtasks.admin", + members: ["user:sundar@google.com"], + }; + // Not that anyone should actually make these public, + // it makes for easier testing. + const PUBLIC_ENQUEUER_BINDING: iam.Binding = { + role: "roles/cloudtasks.enqueuer", + members: ["allUsers"], + }; + it("can blind-write", async () => { + await cloudtasks.setEnqueuer(NAME, ["private"], /* assumeEmpty= */ true); + expect(ct.getIamPolicy).to.not.have.been.called; + expect(ct.setIamPolicy).to.not.have.been.called; + + await cloudtasks.setEnqueuer(NAME, ["public"], /* assumeEmpty= */ true); + expect(ct.getIamPolicy).to.not.have.been.called; + expect(ct.setIamPolicy).to.have.been.calledWith(NAME, { + bindings: [PUBLIC_ENQUEUER_BINDING], + etag: "", + version: 3, + }); + }); + + it("preserves other roles", async () => { + ct.getIamPolicy.resolves({ + bindings: [ADMIN_BINDING, PUBLIC_ENQUEUER_BINDING], + etag: "", + version: 3, + }); + + await cloudtasks.setEnqueuer(NAME, ["private"]); + expect(ct.getIamPolicy).to.have.been.called; + expect(ct.setIamPolicy).to.have.been.calledWith(NAME, { + bindings: [ADMIN_BINDING], + etag: "", + version: 3, + }); + }); + + it("noops existing matches", async () => { + ct.getIamPolicy.resolves({ + bindings: [ADMIN_BINDING, PUBLIC_ENQUEUER_BINDING], + etag: "", + version: 3, + }); + + await cloudtasks.setEnqueuer(NAME, ["public"]); + expect(ct.getIamPolicy).to.have.been.called; + expect(ct.setIamPolicy).to.not.have.been.called; + }); + + it("can insert an enqueuer binding", async () => { + ct.getIamPolicy.resolves({ + bindings: [ADMIN_BINDING], + etag: "", + version: 3, + }); + + await cloudtasks.setEnqueuer(NAME, ["public"]); + expect(ct.getIamPolicy).to.have.been.called; + expect(ct.setIamPolicy).to.have.been.calledWith(NAME, { + bindings: [ADMIN_BINDING, PUBLIC_ENQUEUER_BINDING], + etag: "", + version: 3, + }); + }); + + it("can resolve conflicts", async () => { + ct.getIamPolicy.onCall(0).resolves({ + bindings: [ADMIN_BINDING], + etag: "", + version: 3, + }); + ct.getIamPolicy.onCall(1).resolves({ + bindings: [ADMIN_BINDING], + etag: "2", + version: 3, + }); + ct.setIamPolicy.onCall(0).rejects({ context: { response: { statusCode: 429 } } }); + + await cloudtasks.setEnqueuer(NAME, ["public"]); + expect(ct.getIamPolicy).to.have.been.calledTwice; + expect(ct.setIamPolicy).to.have.been.calledTwice; + expect(ct.setIamPolicy).to.have.been.calledWithMatch(NAME, { + etag: "2", + }); + }); + }); +});