Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel requests in the functions emulator #5149

Merged
merged 9 commits into from Nov 1, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,3 +7,4 @@
- Adds `--disable-triggers` flag to RTDB write commands.
- Default enables experiment to skip deploying unmodified functions (#5192)
- Default enables experiment to allow parameterized functions codebases (#5192)
- Fixes parallel requests in the functions emulator (#5149).
18 changes: 16 additions & 2 deletions scripts/triggers-end-to-end-tests/tests.ts
Expand Up @@ -122,6 +122,20 @@ describe("function triggers", () => {
await test.stopEmulators();
});

describe("https triggers", () => {
it("should handle parallel requests", async function (this) {
this.timeout(TEST_SETUP_TIMEOUT);

const [resp1, resp2] = await Promise.all([
test.invokeHttpFunction("httpsv2reaction"),
test.invokeHttpFunction("httpsv2reaction"),
]);

expect(resp1.status).to.eq(200);
expect(resp2.status).to.eq(200);
});
});

describe("database and firestore emulator triggers", () => {
it("should write to the database emulator", async function (this) {
this.timeout(EMULATOR_TEST_TIMEOUT);
Expand All @@ -131,7 +145,7 @@ describe("function triggers", () => {
});

it("should write to the firestore emulator", async function (this) {
this.timeout(EMULATOR_TEST_TIMEOUT);
this.timeout(EMULATOR_TEST_TIMEOUT * 2);

const response = await test.writeToFirestore();
expect(response.status).to.equal(200);
Expand All @@ -143,7 +157,7 @@ describe("function triggers", () => {
* fixture state handlers to complete before we check
* that state in the next test.
*/
await new Promise((resolve) => setTimeout(resolve, EMULATORS_WRITE_DELAY_MS));
await new Promise((resolve) => setTimeout(resolve, EMULATORS_WRITE_DELAY_MS * 2));
});

it("should have have triggered cloud functions", () => {
Expand Down
4 changes: 4 additions & 0 deletions scripts/triggers-end-to-end-tests/v2/index.js
Expand Up @@ -29,6 +29,10 @@ const START_DOCUMENT_NAME = "test/start";

admin.initializeApp();

exports.httpsv2reaction = functionsV2.https.onRequest((req, res) => {
res.send("httpsv2reaction");
});

exports.pubsubv2reaction = functionsV2.pubsub.onMessagePublished(PUBSUB_TOPIC, (cloudevent) => {
console.log(PUBSUB_FUNCTION_LOG);
console.log("Message", JSON.stringify(cloudevent.data.message.json));
Expand Down
20 changes: 18 additions & 2 deletions src/emulator/functionsRuntimeWorker.ts
Expand Up @@ -12,6 +12,9 @@ import { Serializable } from "child_process";
type LogListener = (el: EmulatorLog) => any;

export enum RuntimeWorkerState {
// Worker has been created but is not ready to accept work
CREATED = "CREATED",

// Worker is ready to accept new work
IDLE = "IDLE",

Expand All @@ -34,7 +37,7 @@ export class RuntimeWorker {
stateEvents: EventEmitter = new EventEmitter();

private logListeners: Array<LogListener> = [];
private _state: RuntimeWorkerState = RuntimeWorkerState.IDLE;
private _state: RuntimeWorkerState = RuntimeWorkerState.CREATED;

constructor(key: string, runtime: FunctionsRuntimeInstance) {
this.id = uuid.v4();
Expand Down Expand Up @@ -86,6 +89,10 @@ export class RuntimeWorker {
return lines[lines.length - 1];
}

readyForWork(): void {
this.state = RuntimeWorkerState.IDLE;
}

sendDebugMsg(debug: FunctionsRuntimeBundle["debug"]): Promise<void> {
return new Promise((resolve, reject) => {
this.runtime.process.send(JSON.stringify(debug), (err) => {
Expand Down Expand Up @@ -178,7 +185,11 @@ export class RuntimeWorker {
path: "/__/health",
socketPath: this.runtime.socketPath,
},
() => resolve()
() => {
// Set the worker state to IDLE for new work
this.readyForWork();
resolve();
}
)
.end();
req.on("error", (error) => {
Expand Down Expand Up @@ -323,6 +334,11 @@ export class RuntimeWorkerPool {
return;
}

/**
* Adds a worker to the pool.
* Caller must set the worker status to ready by calling
* `worker.readyForWork()` or `worker.waitForSocketReady()`.
*/
addWorker(
triggerId: string | undefined,
runtime: FunctionsRuntimeInstance,
Expand Down
37 changes: 30 additions & 7 deletions src/test/emulators/functionsRuntimeWorker.spec.ts
Expand Up @@ -41,6 +41,7 @@ class MockRuntimeInstance implements FunctionsRuntimeInstance {
*/
class WorkerStateCounter {
counts: { [state in RuntimeWorkerState]: number } = {
CREATED: 0,
IDLE: 0,
BUSY: 0,
FINISHING: 0,
Expand All @@ -49,6 +50,9 @@ class WorkerStateCounter {

constructor(worker: RuntimeWorker) {
this.increment(worker.state);
worker.stateEvents.on(RuntimeWorkerState.CREATED, () => {
this.increment(RuntimeWorkerState.CREATED);
});
worker.stateEvents.on(RuntimeWorkerState.IDLE, () => {
this.increment(RuntimeWorkerState.IDLE);
});
Expand All @@ -68,67 +72,79 @@ class WorkerStateCounter {
}

get total() {
return this.counts.IDLE + this.counts.BUSY + this.counts.FINISHING + this.counts.FINISHED;
return (
this.counts.CREATED +
this.counts.IDLE +
this.counts.BUSY +
this.counts.FINISHING +
this.counts.FINISHED
);
}
}

describe("FunctionsRuntimeWorker", () => {
const workerPool = new RuntimeWorkerPool();

describe("RuntimeWorker", () => {
it("goes from idle --> busy --> idle in normal operation", async () => {
it("goes from created --> idle --> busy --> idle in normal operation", async () => {
const scope = nock("http://localhost").get("/").reply(200);

const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
await worker.request(
{ method: "GET", path: "/" },
httpMocks.createResponse({ eventEmitter: EventEmitter })
);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.IDLE).to.eql(2);
expect(counter.total).to.eql(3);
expect(counter.total).to.eql(4);
});

it("goes from idle --> busy --> finished when there's an error", async () => {
it("goes from created --> idle --> busy --> finished when there's an error", async () => {
const scope = nock("http://localhost").get("/").replyWithError("boom");

const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
colerogers marked this conversation as resolved.
Show resolved Hide resolved
await worker.request(
{ method: "GET", path: "/" },
httpMocks.createResponse({ eventEmitter: EventEmitter })
);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.IDLE).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.FINISHED).to.eql(1);
expect(counter.total).to.eql(3);
expect(counter.total).to.eql(4);
});

it("goes from busy --> finishing --> finished when marked", async () => {
it("goes from created --> busy --> finishing --> finished when marked", async () => {
const scope = nock("http://localhost").get("/").replyWithError("boom");

const worker = new RuntimeWorker(workerPool.getKey("trigger"), new MockRuntimeInstance());
const counter = new WorkerStateCounter(worker);

worker.readyForWork();
const resp = httpMocks.createResponse({ eventEmitter: EventEmitter });
resp.on("end", () => {
worker.state = RuntimeWorkerState.FINISHING;
});
await worker.request({ method: "GET", path: "/" }, resp);
scope.done();

expect(counter.counts.CREATED).to.eql(1);
expect(counter.counts.IDLE).to.eql(1);
expect(counter.counts.BUSY).to.eql(1);
expect(counter.counts.FINISHING).to.eql(1);
expect(counter.counts.FINISHED).to.eql(1);
expect(counter.total).to.eql(4);
expect(counter.total).to.eql(5);
});
});

Expand All @@ -144,6 +160,7 @@ describe("FunctionsRuntimeWorker", () => {

// Add a worker and make sure it's there
const worker = pool.addWorker(trigger, new MockRuntimeInstance());
worker.readyForWork();
const triggerWorkers = pool.getTriggerWorkers(trigger);
expect(triggerWorkers.length).length.to.eq(1);
expect(pool.getIdleWorker(trigger)).to.eql(worker);
Expand All @@ -170,6 +187,7 @@ describe("FunctionsRuntimeWorker", () => {
// Add a worker to the pool that's destined to fail.
const scope = nock("http://localhost").get("/").replyWithError("boom");
const worker = pool.addWorker(trigger, new MockRuntimeInstance());
worker.readyForWork();
expect(pool.getIdleWorker(trigger)).to.eql(worker);

// Send request to the worker. Request should fail, killing the worker.
Expand All @@ -188,9 +206,11 @@ describe("FunctionsRuntimeWorker", () => {
const trigger = "trigger1";

const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance());
busyWorker.readyForWork();
const busyWorkerCounter = new WorkerStateCounter(busyWorker);

const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance());
idleWorker.readyForWork();
const idleWorkerCounter = new WorkerStateCounter(idleWorker);

// Add a worker to the pool that's destined to fail.
Expand All @@ -217,9 +237,11 @@ describe("FunctionsRuntimeWorker", () => {
const trigger = "trigger1";

const busyWorker = pool.addWorker(trigger, new MockRuntimeInstance());
busyWorker.readyForWork();
const busyWorkerCounter = new WorkerStateCounter(busyWorker);

const idleWorker = pool.addWorker(trigger, new MockRuntimeInstance());
idleWorker.readyForWork();
const idleWorkerCounter = new WorkerStateCounter(idleWorker);

// Add a worker to the pool that's destined to fail.
Expand Down Expand Up @@ -248,6 +270,7 @@ describe("FunctionsRuntimeWorker", () => {

const pool = new RuntimeWorkerPool(FunctionsExecutionMode.SEQUENTIAL);
const worker = pool.addWorker(trigger1, new MockRuntimeInstance());
worker.readyForWork();

const resp = httpMocks.createResponse({ eventEmitter: EventEmitter });
resp.on("end", () => {
Expand Down