Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix websocket and webtransport multipart callbacks (#698) #699

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 14 additions & 24 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface SendOptions {

type ReadyState = "opening" | "open" | "closing" | "closed";

type SendCallback = (transport: Transport) => void;

export class Socket extends EventEmitter {
public readonly protocol: number;
// TODO for the next major release: do not keep the reference to the first HTTP request, as it stays in memory
Expand All @@ -27,8 +29,8 @@ export class Socket extends EventEmitter {
private upgrading = false;
private upgraded = false;
private writeBuffer: Packet[] = [];
private packetsFn: Array<() => void> = [];
private sentCallbackFn: any[] = [];
private packetsFn: SendCallback[] = [];
private sentCallbackFn: SendCallback[][] = [];
private cleanupFn: any[] = [];
private pingTimeoutTimer;
private pingIntervalTimer;
Expand Down Expand Up @@ -395,19 +397,11 @@ export class Socket extends EventEmitter {
// the message was sent successfully, execute the callback
const onDrain = () => {
if (this.sentCallbackFn.length > 0) {
const seqFn = this.sentCallbackFn.splice(0, 1)[0];
if ("function" === typeof seqFn) {
debug("executing send callback");
seqFn(this.transport);
} else if (Array.isArray(seqFn)) {
debug("executing batch send callback");
const l = seqFn.length;
let i = 0;
for (; i < l; i++) {
if ("function" === typeof seqFn[i]) {
seqFn[i](this.transport);
}
}
debug("executing batch send callback");
const seqFn = this.sentCallbackFn.shift();
const l = seqFn.length;
for (let i = 0; i < l; i++) {
seqFn[i](this.transport);
}
}
};
Expand All @@ -428,7 +422,7 @@ export class Socket extends EventEmitter {
* @return {Socket} for chaining
* @api public
*/
public send(data: RawData, options?: SendOptions, callback?: () => void) {
public send(data: RawData, options?: SendOptions, callback?: SendCallback) {
this.sendPacket("message", data, options, callback);
return this;
}
Expand All @@ -440,7 +434,7 @@ export class Socket extends EventEmitter {
* @param options
* @param callback
*/
public write(data: RawData, options?: SendOptions, callback?: () => void) {
public write(data: RawData, options?: SendOptions, callback?: SendCallback) {
this.sendPacket("message", data, options, callback);
return this;
}
Expand All @@ -459,7 +453,7 @@ export class Socket extends EventEmitter {
type: PacketType,
data?: RawData,
options: SendOptions = {},
callback?: () => void
callback?: SendCallback
) {
if ("function" === typeof options) {
callback = options;
Expand All @@ -485,7 +479,7 @@ export class Socket extends EventEmitter {
this.writeBuffer.push(packet);

// add send callback to object, if defined
if (callback) this.packetsFn.push(callback);
if ("function" === typeof callback) this.packetsFn.push(callback);

this.flush();
}
Expand All @@ -507,11 +501,7 @@ export class Socket extends EventEmitter {
this.server.emit("flush", this, this.writeBuffer);
const wbuf = this.writeBuffer;
this.writeBuffer = [];
if (!this.transport.supportsFraming) {
this.sentCallbackFn.push(this.packetsFn);
} else {
this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
}
this.sentCallbackFn.push(this.packetsFn);
this.packetsFn = [];
this.transport.send(wbuf);
this.emit("drain");
Expand Down
5 changes: 0 additions & 5 deletions lib/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ export abstract class Transport extends EventEmitter {
this.emit("close");
}

/**
* Advertise framing support.
*/
abstract get supportsFraming();

/**
* The name of the transport.
*/
Expand Down
4 changes: 0 additions & 4 deletions lib/transports-uws/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ export class Polling extends Transport {
return "polling";
}

get supportsFraming() {
return false;
}

/**
* Overrides onRequest.
*
Expand Down
9 changes: 0 additions & 9 deletions lib/transports-uws/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ export class WebSocket extends Transport {
return true;
}

/**
* Advertise framing support.
*
* @api public
*/
get supportsFraming() {
return true;
}

/**
* Writes a packet payload.
*
Expand Down
4 changes: 0 additions & 4 deletions lib/transports/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ export class Polling extends Transport {
return "polling";
}

get supportsFraming() {
return false;
}

/**
* Overrides onRequest.
*
Expand Down
9 changes: 0 additions & 9 deletions lib/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ export class WebSocket extends Transport {
return true;
}

/**
* Advertise framing support.
*
* @api public
*/
get supportsFraming() {
return true;
}

/**
* Writes a packet payload.
*
Expand Down
4 changes: 0 additions & 4 deletions lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ export class WebTransport extends Transport {
return "webtransport";
}

get supportsFraming() {
return true;
}

async send(packets) {
this.writable = false;

Expand Down
14 changes: 12 additions & 2 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2759,13 +2759,23 @@ describe("server", () => {
});
});

it("should execute in multipart packet", (done) => {
it("should execute in multipart packet (websocket)", (done) => {
const engine = listen((port) => {
const socket = new ClientSocket(`ws://localhost:${port}`);
const socket = new ClientSocket(`ws://localhost:${port}`, {
transports: ["websocket"],
});
let i = 0;
let j = 0;

engine.on("connection", (conn) => {
conn.send("d", (transport) => {
i++;
});

conn.send("c", (transport) => {
i++;
});

conn.send("b", (transport) => {
i++;
});
Expand Down
15 changes: 15 additions & 0 deletions test/webtransport.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,21 @@ describe("WebTransport", () => {
});
});

it("should invoke send callbacks (server to client)", (done) => {
setup({}, async ({ engine, h3Server, socket, reader }) => {
const messageCount = 4;
let receivedCallbacks = 0;

for (let i = 0; i < messageCount; i++) {
socket.send("hello", () => {
if (++receivedCallbacks === messageCount) {
success(engine, h3Server, done);
}
});
}
});
});

it("should send some binary data (client to server)", (done) => {
setup({}, async ({ engine, h3Server, socket, writer }) => {
socket.on("data", (data) => {
Expand Down