Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel requests in the functions emulator #5149

Merged
merged 9 commits into from Nov 1, 2022
10 changes: 10 additions & 0 deletions scripts/integration-helpers/framework.ts
Expand Up @@ -45,6 +45,7 @@ const AUTH_BLOCKING_CREATE_V2_LOG =
"========== AUTH BLOCKING CREATE V2 FUNCTION METADATA ==========";
const AUTH_BLOCKING_SIGN_IN_V2_LOG =
"========== AUTH BLOCKING SIGN IN V2 FUNCTION METADATA ==========";
const HTTPS_V2_FUNCTION_LOG = "========== HTTPS V2 FUNCTION ==========";

interface ConnectionInfo {
host: string;
Expand Down Expand Up @@ -143,6 +144,7 @@ export class TriggerEndToEndTest extends EmulatorEndToEndTest {
authBlockingCreateV2TriggerCount = 0;
authBlockingSignInV2TriggerCount = 0;
rtdbV2TriggerCount = 0;
httpsV2TriggerCount = 0;

rtdbFromFirestore = false;
firestoreFromRtdb = false;
Expand Down Expand Up @@ -179,6 +181,7 @@ export class TriggerEndToEndTest extends EmulatorEndToEndTest {
this.authBlockingCreateV2TriggerCount = 0;
this.authBlockingSignInV2TriggerCount = 0;
this.rtdbV2TriggerCount = 0;
this.httpsV2TriggerCount = 0;
}

/*
Expand Down Expand Up @@ -274,6 +277,9 @@ export class TriggerEndToEndTest extends EmulatorEndToEndTest {
if (data.includes(RTDB_V2_FUNCTION_LOG)) {
this.rtdbV2TriggerCount++;
}
if (data.includes(HTTPS_V2_FUNCTION_LOG)) {
this.httpsV2TriggerCount++;
}
});

return startEmulators;
Expand Down Expand Up @@ -336,6 +342,10 @@ export class TriggerEndToEndTest extends EmulatorEndToEndTest {
});
}

triggerHttpsFunction(): Promise<Response> {
return this.invokeHttpFunction("triggerHttpsFunction");
}

colerogers marked this conversation as resolved.
Show resolved Hide resolved
createUserFromAuth(): Promise<Response> {
return this.invokeHttpFunction("createUserFromAuth");
}
Expand Down
23 changes: 21 additions & 2 deletions scripts/triggers-end-to-end-tests/tests.ts
Expand Up @@ -122,6 +122,25 @@ describe("function triggers", () => {
await test.stopEmulators();
});

describe("https triggers", () => {
it("should handle parallel requests", async function (this) {
this.timeout(EMULATOR_TEST_TIMEOUT);

const [resp1, resp2] = await Promise.all([
test.triggerHttpsFunction(),
test.triggerHttpsFunction(),
]);

expect(resp1.status).to.eq(200);
expect(resp2.status).to.eq(200);
await new Promise((resolve) => setTimeout(resolve, EMULATORS_WRITE_DELAY_MS));
});

it("should have triggered the cloud functions", () => {
expect(test.httpsV2TriggerCount).to.eq(2);
});
});

describe("database and firestore emulator triggers", () => {
it("should write to the database emulator", async function (this) {
this.timeout(EMULATOR_TEST_TIMEOUT);
Expand All @@ -131,7 +150,7 @@ describe("function triggers", () => {
});

it("should write to the firestore emulator", async function (this) {
this.timeout(EMULATOR_TEST_TIMEOUT);
this.timeout(EMULATOR_TEST_TIMEOUT * 2);

const response = await test.writeToFirestore();
expect(response.status).to.equal(200);
Expand All @@ -143,7 +162,7 @@ describe("function triggers", () => {
* fixture state handlers to complete before we check
* that state in the next test.
*/
await new Promise((resolve) => setTimeout(resolve, EMULATORS_WRITE_DELAY_MS));
await new Promise((resolve) => setTimeout(resolve, EMULATORS_WRITE_DELAY_MS * 2));
});

it("should have have triggered cloud functions", () => {
Expand Down
24 changes: 24 additions & 0 deletions scripts/triggers-end-to-end-tests/triggers/index.js
Expand Up @@ -8,9 +8,21 @@ const {
createUserWithEmailAndPassword,
signInWithEmailAndPassword,
} = require("firebase/auth");
const fs = require("fs");
const path = require("path");
const fetch = require("node-fetch");

// load the config from the parent dir
const filename = path.join(__dirname, "/../firebase.json");
const data = fs.readFileSync(filename, "utf8");
const config = JSON.parse(data);

const FIREBASE_PROJECT = process.env.FBTOOLS_TARGET_PROJECT || "";

const FUNCTIONS_PORT = config.emulators.functions?.port || "9002";

const FUNCTIONS_REGION = "us-central1";

/*
* We install onWrite triggers for START_DOCUMENT_NAME in both the firestore and
* database emulators. From each respective onWrite trigger, we write a document
Expand Down Expand Up @@ -43,6 +55,18 @@ const app = initializeApp(
const auth = getAuth(app);
connectAuthEmulator(auth, `http://${process.env.FIREBASE_AUTH_EMULATOR_HOST}`);

function invokeHttpsFunction(name) {
const url = `http://localhost:${[FUNCTIONS_PORT, FIREBASE_PROJECT, FUNCTIONS_REGION, name].join(
"/"
)}`;
return fetch(url);
}

exports.triggerHttpsFunction = functions.https.onRequest(async (req, res) => {
await invokeHttpsFunction("httpsv2reaction");
res.json({ triggered: true });
});
colerogers marked this conversation as resolved.
Show resolved Hide resolved

exports.deleteFromFirestore = functions.https.onRequest(async (req, res) => {
await admin.firestore().doc(START_DOCUMENT_NAME).delete();
res.json({ deleted: true });
Expand Down
6 changes: 6 additions & 0 deletions scripts/triggers-end-to-end-tests/v2/index.js
Expand Up @@ -23,12 +23,18 @@ const AUTH_BLOCKING_CREATE_V2_LOG =
const AUTH_BLOCKING_SIGN_IN_V2_LOG =
"========== AUTH BLOCKING SIGN IN V2 FUNCTION METADATA ==========";
const RTDB_LOG = "========== RTDB V2 FUNCTION ==========";
const HTTPS_FUNCTION_LOG = "========== HTTPS V2 FUNCTION ==========";

const PUBSUB_TOPIC = "test-topic";
const START_DOCUMENT_NAME = "test/start";

admin.initializeApp();

exports.httpsv2reaction = functionsV2.https.onRequest((req, res) => {
console.log(HTTPS_FUNCTION_LOG);
colerogers marked this conversation as resolved.
Show resolved Hide resolved
res.send();
});

exports.pubsubv2reaction = functionsV2.pubsub.onMessagePublished(PUBSUB_TOPIC, (cloudevent) => {
console.log(PUBSUB_FUNCTION_LOG);
console.log("Message", JSON.stringify(cloudevent.data.message.json));
Expand Down
20 changes: 18 additions & 2 deletions src/emulator/functionsRuntimeWorker.ts
Expand Up @@ -12,6 +12,9 @@ import { Serializable } from "child_process";
type LogListener = (el: EmulatorLog) => any;

export enum RuntimeWorkerState {
// Worker has been created but is not ready to accept work
CREATED = "CREATED",

// Worker is ready to accept new work
IDLE = "IDLE",

Expand All @@ -34,7 +37,7 @@ export class RuntimeWorker {
stateEvents: EventEmitter = new EventEmitter();

private logListeners: Array<LogListener> = [];
private _state: RuntimeWorkerState = RuntimeWorkerState.IDLE;
private _state: RuntimeWorkerState = RuntimeWorkerState.CREATED;

constructor(key: string, runtime: FunctionsRuntimeInstance) {
this.id = uuid.v4();
Expand Down Expand Up @@ -86,6 +89,10 @@ export class RuntimeWorker {
return lines[lines.length - 1];
}

readyForWork(): void {
this.state = RuntimeWorkerState.IDLE;
}

sendDebugMsg(debug: FunctionsRuntimeBundle["debug"]): Promise<void> {
return new Promise((resolve, reject) => {
this.runtime.process.send(JSON.stringify(debug), (err) => {
Expand Down Expand Up @@ -178,7 +185,11 @@ export class RuntimeWorker {
path: "/__/health",
socketPath: this.runtime.socketPath,
},
() => resolve()
() => {
// Set the worker state to IDLE for new work
this.readyForWork();
resolve();
}
)
.end();
req.on("error", (error) => {
Expand Down Expand Up @@ -323,6 +334,11 @@ export class RuntimeWorkerPool {
return;
}

/**
* Adds a worker to the pool.
* Caller must set the worker status to ready by calling
* `worker.readyForWork()` or `worker.waitForSocketReady()`.
*/
addWorker(
triggerId: string | undefined,
runtime: FunctionsRuntimeInstance,
Expand Down
31 changes: 27 additions & 4 deletions src/test/emulators/functionsRuntimeWorker.spec.ts
Expand Up @@ -41,6 +41,7 @@ class MockRuntimeInstance implements FunctionsRuntimeInstance {
*/
class WorkerStateCounter {
counts: { [state in RuntimeWorkerState]: number } = {
CREATED: 0,
IDLE: 0,
BUSY: 0,
FINISHING: 0,
Expand All @@ -49,6 +50,9 @@ class WorkerStateCounter {

constructor(worker: RuntimeWorker) {
this.increment(worker.state);
worker.stateEvents.on(RuntimeWorkerState.CREATED, () => {
this.increment(RuntimeWorkerState.CREATED);
});
worker.stateEvents.on(RuntimeWorkerState.IDLE, () => {
this.increment(RuntimeWorkerState.IDLE);
});
Expand All @@ -68,7 +72,13 @@ class WorkerStateCounter {
}

get total() {
return this.counts.IDLE + this.counts.BUSY + this.counts.FINISHING + this.counts.FINISHED;
return (
this.counts.CREATED +
this.counts.IDLE +
this.counts.BUSY +
this.counts.FINISHING +
this.counts.FINISHED
);
}
}

Expand All @@ -82,15 +92,17 @@ describe("FunctionsRuntimeWorker", () => {
const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
await worker.request(
{ method: "GET", path: "/" },
httpMocks.createResponse({ eventEmitter: EventEmitter })
);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.IDLE).to.eql(2);
expect(counter.total).to.eql(3);
expect(counter.total).to.eql(4);
});

it("goes from idle --> busy --> finished when there's an error", async () => {
Expand All @@ -99,16 +111,18 @@ describe("FunctionsRuntimeWorker", () => {
const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
colerogers marked this conversation as resolved.
Show resolved Hide resolved
await worker.request(
{ method: "GET", path: "/" },
httpMocks.createResponse({ eventEmitter: EventEmitter })
);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.IDLE).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.FINISHED).to.eql(1);
expect(counter.total).to.eql(3);
expect(counter.total).to.eql(4);
});

it("goes from busy --> finishing --> finished when marked", async () => {
Expand All @@ -117,18 +131,20 @@ describe("FunctionsRuntimeWorker", () => {
const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
const resp = httpMocks.createResponse({ eventEmitter: EventEmitter });
resp.on("end", () => {
worker.state = RuntimeWorkerState.FINISHING;
});
await worker.request({ method: "GET", path: "/" }, resp);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.IDLE).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.FINISHING).to.eql(1);
expect(counter.counts.FINISHED).to.eql(1);
expect(counter.total).to.eql(4);
expect(counter.total).to.eql(5);
});
});

Expand All @@ -144,6 +160,7 @@ describe("FunctionsRuntimeWorker", () => {

// Add a worker and make sure it's there
const worker = pool.addWorker(trigger, new MockRuntimeInstance());
worker.readyForWork();
const triggerWorkers = pool.getTriggerWorkers(trigger);
expect(triggerWorkers.length).length.to.eq(1);
expect(pool.getIdleWorker(trigger)).to.eql(worker);
Expand All @@ -170,6 +187,7 @@ describe("FunctionsRuntimeWorker", () => {
// Add a worker to the pool that's destined to fail.
const scope = nock("http://localhost").get("/").replyWithError("boom");
const worker = pool.addWorker(trigger, new MockRuntimeInstance());
worker.readyForWork();
expect(pool.getIdleWorker(trigger)).to.eql(worker);

// Send request to the worker. Request should fail, killing the worker.
Expand All @@ -188,9 +206,11 @@ describe("FunctionsRuntimeWorker", () => {
const trigger = "trigger1";

const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance());
busyWorker.readyForWork();
const busyWorkerCounter = new WorkerStateCounter(busyWorker);

const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance());
idleWorker.readyForWork();
const idleWorkerCounter = new WorkerStateCounter(idleWorker);

// Add a worker to the pool that's destined to fail.
Expand All @@ -217,9 +237,11 @@ describe("FunctionsRuntimeWorker", () => {
const trigger = "trigger1";

const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance());
busyWorker.readyForWork();
const busyWorkerCounter = new WorkerStateCounter(busyWorker);

const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance());
idleWorker.readyForWork();
const idleWorkerCounter = new WorkerStateCounter(idleWorker);

// Add a worker to the pool that's destined to fail.
Expand Down Expand Up @@ -248,6 +270,7 @@ describe("FunctionsRuntimeWorker", () => {

const pool = new RuntimeWorkerPool(FunctionsExecutionMode.SEQUENTIAL);
const worker = pool.addWorker(trigger1, new MockRuntimeInstance());
worker.readyForWork();

const resp = httpMocks.createResponse({ eventEmitter: EventEmitter });
resp.on("end", () => {
Expand Down