Skip to content

Commit

Permalink
Implement new trigger format (#3872)
Browse files Browse the repository at this point in the history
* Introduce new trigger type

* Implement task queue triggers.

1. Renamed the retry policy from last change to match the spec.
2. We do not delete queues because they cannot be recreated for 4 days.
   We may eventually need a utility to delete all disabled queues. We
   cannot make such a utility only clean up our queues because Cloud
   Tasks does not support labels.
  • Loading branch information
inlined authored and kroikie committed Mar 4, 2022
1 parent 13d0400 commit 42dfd0c
Show file tree
Hide file tree
Showing 12 changed files with 645 additions and 40 deletions.
4 changes: 4 additions & 0 deletions src/api.js
Expand Up @@ -155,6 +155,10 @@ var api = {
"FIREBASE_CLOUDSCHEDULER_URL",
"https://cloudscheduler.googleapis.com"
),
cloudTasksOrigin: utils.envOverride(
"FIREBASE_CLOUD_TAKS_URL",
"https://cloudtasks.googleapis.com"
),
pubsubOrigin: utils.envOverride("FIREBASE_PUBSUB_URL", "https://pubsub.googleapis.com"),
googleOrigin: utils.envOverride(
"FIREBASE_TOKEN_URL",
Expand Down
4 changes: 2 additions & 2 deletions src/deploy/functions/backend.ts
Expand Up @@ -95,7 +95,7 @@ export interface TaskQueueRateLimits {
maxDispatchesPerSecond?: number;
}

export interface TaskQueueRetryPolicy {
export interface TaskQueueRetryConfig {
maxAttempts?: number;
maxRetryDuration?: proto.Duration;
minBackoff?: proto.Duration;
Expand All @@ -105,7 +105,7 @@ export interface TaskQueueRetryPolicy {

export interface TaskQueueTrigger {
rateLimits?: TaskQueueRateLimits;
retryPolicy?: TaskQueueRetryPolicy;
retryConfig?: TaskQueueRetryConfig;
invoker?: string[];
}

Expand Down
87 changes: 63 additions & 24 deletions src/deploy/functions/release/fabricator.ts
Expand Up @@ -9,6 +9,7 @@ import { getHumanFriendlyRuntimeName } from "../runtimes";
import { functionsOrigin, functionsV2Origin } from "../../../api";
import { logger } from "../../../logger";
import * as backend from "../backend";
import * as cloudtasks from "../../../gcp/cloudtasks";
import * as deploymentTool from "../../../deploymentTool";
import * as gcf from "../../../gcp/cloudfunctions";
import * as gcfV2 from "../../../gcp/cloudfunctionsv2";
Expand Down Expand Up @@ -227,6 +228,17 @@ export class Fabricator {
})
.catch(rethrowAs(endpoint, "set invoker"));
}
} else if (backend.isTaskQueueTriggered(endpoint)) {
// Like HTTPS triggers, taskQueueTriggers have an invoker, but unlike HTTPS they don't default
// public.
const invoker = endpoint.taskQueueTrigger.invoker;
if (invoker && !invoker.includes("private")) {
await this.executor
.run(async () => {
await gcf.setInvokerCreate(endpoint.project, backend.functionName(endpoint), invoker);
})
.catch(rethrowAs(endpoint, "set invoker"));
}
}
}

Expand Down Expand Up @@ -280,6 +292,17 @@ export class Fabricator {
.run(() => run.setInvokerCreate(endpoint.project, serviceName, invoker))
.catch(rethrowAs(endpoint, "set invoker"));
}
} else if (backend.isTaskQueueTriggered(endpoint)) {
// Like HTTPS triggers, taskQueueTriggers have an invoker, but unlike HTTPS they don't default
// public.
const invoker = endpoint.taskQueueTrigger.invoker;
if (invoker && !invoker.includes("private")) {
await this.executor
.run(async () => {
await gcf.setInvokerCreate(endpoint.project, backend.functionName(endpoint), invoker);
})
.catch(rethrowAs(endpoint, "set invoker"));
}
}

await this.setConcurrency(
Expand Down Expand Up @@ -309,16 +332,15 @@ export class Fabricator {
.catch(rethrowAs(endpoint, "update"));

endpoint.uri = resultFunction?.httpsTrigger?.url;
if (backend.isHttpsTriggered(endpoint) && endpoint.httpsTrigger.invoker) {
let invoker: string[] | undefined;
if (backend.isHttpsTriggered(endpoint)) {
invoker = endpoint.httpsTrigger.invoker;
} else if (backend.isTaskQueueTriggered(endpoint)) {
invoker = endpoint.taskQueueTrigger.invoker;
}
if (invoker) {
await this.executor
.run(async () => {
await gcf.setInvokerUpdate(
endpoint.project,
backend.functionName(endpoint),
endpoint.httpsTrigger.invoker!
);
return;
})
.run(() => gcf.setInvokerUpdate(endpoint.project, backend.functionName(endpoint), invoker!))
.catch(rethrowAs(endpoint, "set invoker"));
}
}
Expand All @@ -338,7 +360,7 @@ export class Fabricator {
delete apiFunction.eventTrigger.pubsubTopic;
}

const resultFunction = (await this.functionExecutor
const resultFunction = await this.functionExecutor
.run(async () => {
const op: { name: string } = await gcfV2.updateFunction(apiFunction);
return await poller.pollOperation<gcfV2.CloudFunction>({
Expand All @@ -347,15 +369,19 @@ export class Fabricator {
operationResourceName: op.name,
});
})
.catch(rethrowAs(endpoint, "update"))) as gcfV2.CloudFunction;
.catch(rethrowAs(endpoint, "update"));

endpoint.uri = resultFunction.serviceConfig.uri;
const serviceName = resultFunction.serviceConfig.service!;
if (backend.isHttpsTriggered(endpoint) && endpoint.httpsTrigger.invoker) {
let invoker: string[] | undefined;
if (backend.isHttpsTriggered(endpoint)) {
invoker = endpoint.httpsTrigger.invoker;
} else if (backend.isTaskQueueTriggered(endpoint)) {
invoker = endpoint.taskQueueTrigger.invoker;
}
if (invoker) {
await this.executor
.run(() =>
run.setInvokerUpdate(endpoint.project, serviceName, endpoint.httpsTrigger.invoker!)
)
.run(() => run.setInvokerUpdate(endpoint.project, serviceName, invoker!))
.catch(rethrowAs(endpoint, "set invoker"));
}

Expand Down Expand Up @@ -443,7 +469,7 @@ export class Fabricator {
}
assertExhaustive(endpoint.platform);
} else if (backend.isTaskQueueTriggered(endpoint)) {
await this.deleteTaskQueue(endpoint);
await this.disableTaskQueue(endpoint);
}
}

Expand All @@ -461,10 +487,19 @@ export class Fabricator {
);
}

upsertTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise<void> {
return Promise.reject(
new reporter.DeploymentError(endpoint, "upsert task queue", new Error("Not implemented"))
);
async upsertTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise<void> {
const queue = cloudtasks.queueFromEndpoint(endpoint);
await this.executor
.run(() => cloudtasks.upsertQueue(queue))
.catch(rethrowAs(endpoint, "upsert task queue"));

// Note: should we split setTrigger into createTrigger and updateTrigger so we can avoid a
// getIamPolicy on create?
if (endpoint.taskQueueTrigger.invoker) {
await this.executor
.run(() => cloudtasks.setEnqueuer(queue.name, endpoint.taskQueueTrigger.invoker!))
.catch(rethrowAs(endpoint, "set invoker"));
}
}

async deleteScheduleV1(endpoint: backend.Endpoint & backend.ScheduleTriggered): Promise<void> {
Expand All @@ -484,10 +519,14 @@ export class Fabricator {
);
}

deleteTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise<void> {
return Promise.reject(
new reporter.DeploymentError(endpoint, "delete task queue", new Error("Not implemented"))
);
async disableTaskQueue(endpoint: backend.Endpoint & backend.TaskQueueTriggered): Promise<void> {
const update = {
name: cloudtasks.queueNameForEndpoint(endpoint),
state: "DISABLED" as cloudtasks.State,
};
await this.executor
.run(() => cloudtasks.updateQueue(update))
.catch(rethrowAs(endpoint, "disable task queue"));
}

logOpStart(op: string, endpoint: backend.Endpoint): void {
Expand Down
2 changes: 2 additions & 0 deletions src/deploy/functions/release/planner.ts
Expand Up @@ -204,6 +204,8 @@ export function checkForIllegalUpdate(want: backend.Endpoint, have: backend.Endp
return "a background triggered";
} else if (backend.isScheduleTriggered(e)) {
return "a scheduled";
} else if (backend.isTaskQueueTriggered(e)) {
return "a task queue";
}
// Unfortunately TypeScript isn't like Scala and I can't prove to it
// that all cases have been handled
Expand Down
2 changes: 1 addition & 1 deletion src/deploy/functions/release/reporter.ts
Expand Up @@ -24,7 +24,7 @@ export type OperationType =
| "upsert schedule"
| "delete schedule"
| "upsert task queue"
| "delete task queue"
| "disable task queue"
| "create topic"
| "delete topic"
| "set invoker"
Expand Down
6 changes: 3 additions & 3 deletions src/deploy/functions/runtimes/discovery/v1alpha1.ts
Expand Up @@ -148,7 +148,7 @@ function parseEndpoints(
} else if (backend.isTaskQueueTriggered(ep)) {
assertKeyTypes(prefix + ".taskQueueTrigger", ep.taskQueueTrigger, {
rateLimits: "object",
retryPolicy: "object",
retryConfig: "object",
invoker: "array",
});
if (ep.taskQueueTrigger.rateLimits) {
Expand All @@ -158,8 +158,8 @@ function parseEndpoints(
maxDispatchesPerSecond: "number",
});
}
if (ep.taskQueueTrigger.retryPolicy) {
assertKeyTypes(prefix + ".taskQueueTrigger.retryPolicy", ep.taskQueueTrigger.retryPolicy, {
if (ep.taskQueueTrigger.retryConfig) {
assertKeyTypes(prefix + ".taskQueueTrigger.retryConfig", ep.taskQueueTrigger.retryConfig, {
maxAttempts: "number",
maxRetryDuration: "string",
minBackoff: "string",
Expand Down
2 changes: 1 addition & 1 deletion src/deploy/functions/runtimes/node/parseTriggers.ts
Expand Up @@ -65,7 +65,7 @@ export interface TriggerAnnotation {
maxConcurrentDispatches?: number;
maxDispatchesPerSecond?: number;
};
retryPolicy?: {
retryConfig?: {
maxAttempts?: number;
maxRetryDuration?: proto.Duration;
minBackoff?: proto.Duration;
Expand Down

0 comments on commit 42dfd0c

Please sign in to comment.