Skip to content

Commit

Permalink
Storage Emulator Enhancement (#3809)
Browse files Browse the repository at this point in the history
* storage trigger works

* removing comment

* cleaning up request body code

* add types

* fixing comments

* removing wrapper, adding check on type field

* using Content-Type header

* linter

* nit
  • Loading branch information
colerogers committed Oct 19, 2021
1 parent 8f3a1ec commit 925bdc3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 13 deletions.
17 changes: 14 additions & 3 deletions src/emulator/functionsEmulator.ts
Expand Up @@ -267,10 +267,21 @@ export class FunctionsEmulator implements EmulatorInstance {
};

const multicastHandler: express.RequestHandler = (req, res) => {
const reqBody = (req as RequestWithRawBody).rawBody;
const proto = JSON.parse(reqBody.toString());
const triggers = this.multicastTriggers[`${this.args.projectId}:${proto.eventType}`] || [];
const projectId = req.params.project_id;
const reqBody = (req as RequestWithRawBody).rawBody;
let proto = JSON.parse(reqBody.toString());
let triggerKey: string;
if (req.headers["content-type"]?.includes("cloudevent")) {
triggerKey = `${this.args.projectId}:${proto.type}`;

if (EventUtils.isBinaryCloudEvent(req)) {
proto = EventUtils.extractBinaryCloudEventContext(req);
proto.data = req.body;
}
} else {
triggerKey = `${this.args.projectId}:${proto.eventType}`;
}
const triggers = this.multicastTriggers[triggerKey] || [];

triggers.forEach((triggerId) => {
this.workQueue.submit(() => {
Expand Down
59 changes: 49 additions & 10 deletions src/emulator/storage/cloudFunctions.ts
@@ -1,18 +1,25 @@
import { EmulatorRegistry } from "../registry";
import { EmulatorInfo, Emulators } from "../types";
import * as request from "request";
import { EmulatorLogger } from "../emulatorLogger";
import { CloudStorageObjectMetadata, toSerializedDate } from "./metadata";
import { Client } from "../../apiv2";
import { StorageObjectData } from "@google/events/cloud/storage/v1/StorageObjectData";

type StorageCloudFunctionAction = "finalize" | "metadataUpdate" | "delete" | "archive";
const STORAGE_V2_ACTION_MAP: Record<StorageCloudFunctionAction, string> = {
finalize: "finalized",
metadataUpdate: "metadataUpdated",
delete: "deleted",
archive: "archived",
};

export class StorageCloudFunctions {
private logger = EmulatorLogger.forEmulator(Emulators.STORAGE);
private functionsEmulatorInfo?: EmulatorInfo;
private multicastOrigin = "";
private multicastPath = "";
private enabled = false;
private client?: Client;

constructor(private projectId: string) {
const functionsEmulator = EmulatorRegistry.get(Emulators.FUNCTIONS);
Expand All @@ -24,27 +31,40 @@ export class StorageCloudFunctions {
this.functionsEmulatorInfo
)}`;
this.multicastPath = `/functions/projects/${projectId}/trigger_multicast`;
this.client = new Client({ urlPrefix: this.multicastOrigin, auth: false });
}
}

public async dispatch(
action: StorageCloudFunctionAction,
object: CloudStorageObjectMetadata
): Promise<void> {
if (!this.enabled) return;

const multicastEventBody = this.createEventRequestBody(action, object);
if (!this.enabled) {
return;
}

const c = new Client({ urlPrefix: this.multicastOrigin, auth: false });
let res;
const errStatus: Array<number> = [];
let err: Error | undefined;
try {
res = await c.post(this.multicastPath, multicastEventBody);
/** Legacy Google Events */
const eventBody = this.createLegacyEventRequestBody(action, object);
const eventRes = await this.client!.post(this.multicastPath, eventBody);
if (eventRes.status !== 200) {
errStatus.push(eventRes.status);
}
/** Modern CloudEvents */
const cloudEventBody = this.createCloudEventRequestBody(action, object);
const cloudEventRes = await this.client!.post(this.multicastPath, cloudEventBody, {
headers: { "Content-Type": "application/cloudevents+json; charset=UTF-8" },
});
if (cloudEventRes.status !== 200) {
errStatus.push(cloudEventRes.status);
}
} catch (e) {
err = e;
err = e as Error;
}

if (err || res?.status != 200) {
if (err || errStatus.length > 0) {
this.logger.logLabeled(
"WARN",
"functions",
Expand All @@ -53,7 +73,8 @@ export class StorageCloudFunctions {
}
}

private createEventRequestBody(
/** Legacy Google Events type */
private createLegacyEventRequestBody(
action: StorageCloudFunctionAction,
objectMetadataPayload: ObjectMetadataPayload
): string {
Expand All @@ -70,6 +91,24 @@ export class StorageCloudFunctions {
data: objectMetadataPayload,
});
}

/** Modern CloudEvents type */
private createCloudEventRequestBody(
action: StorageCloudFunctionAction,
objectMetadataPayload: ObjectMetadataPayload
): string {
const ceAction = STORAGE_V2_ACTION_MAP[action];
if (!ceAction) {
throw new Error("Action is not definied as a CloudEvents action");
}
const data = (objectMetadataPayload as unknown) as StorageObjectData;
return JSON.stringify({
specVersion: 1,
type: `google.cloud.storage.object.v1.${ceAction}`,
source: `//storage.googleapis.com/projects/_/buckets/${objectMetadataPayload.bucket}/objects/${objectMetadataPayload.name}`,
data,
});
}
}

// From https://github.com/firebase/firebase-functions/blob/master/src/providers/storage.ts
Expand Down

0 comments on commit 925bdc3

Please sign in to comment.