Skip to content

Commit

Permalink
feat(sharded): add an option for dynamic private channels
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinKolarik committed Mar 13, 2024
1 parent dc1407f commit 033a358
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 13 deletions.
40 changes: 27 additions & 13 deletions lib/sharded-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ export interface ShardedRedisAdapterOptions {
* The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified).
*
* Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because:
*
* - a lot of connected clients would mean a lot of subscription/unsubscription
* - the Socket ID attribute is ephemeral
*
* - "dynamic-private"
*
* Like "dynamic" but creates separate channels for private rooms as well. Useful when there is lots of 1:1 communication
* via socket.emit() calls.
*
* @default "dynamic"
*/
subscriptionMode?: "static" | "dynamic";
subscriptionMode?: "static" | "dynamic" | "dynamic-private";
}

/**
Expand Down Expand Up @@ -91,15 +95,13 @@ class ShardedRedisAdapter extends ClusterAdapter {

if (this.opts.subscriptionMode === "dynamic") {
this.on("create-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler);
}
});

this.on("delete-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room));
}
});
Expand All @@ -111,8 +113,7 @@ class ShardedRedisAdapter extends ClusterAdapter {

if (this.opts.subscriptionMode === "dynamic") {
this.rooms.forEach((_sids, room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
channels.push(this.dynamicChannel(room));
}
});
Expand All @@ -133,15 +134,19 @@ class ShardedRedisAdapter extends ClusterAdapter {
}

private computeChannel(message) {
const isDynamicChannel =
(this.opts.subscriptionMode === "dynamic" &&
!looksLikeASocketId(message.data.opts.rooms[0])) ||
this.opts.subscriptionMode === "dynamic-private";

// broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all
// servers, not only the ones where the given room exists
const useDynamicChannel =
this.opts.subscriptionMode === "dynamic" &&
const isSupportedMessageType =
message.type === MessageType.BROADCAST &&
message.data.requestId === undefined &&
message.data.opts.rooms.length === 1 &&
!looksLikeASocketId(message.data.opts.rooms[0]);
if (useDynamicChannel) {
message.data.opts.rooms.length === 1;

if (isDynamicChannel && isSupportedMessageType) {
return this.dynamicChannel(message.data.opts.rooms[0]);
} else {
return this.channel;
Expand Down Expand Up @@ -204,4 +209,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
override serverCount(): Promise<number> {
return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel);
}

shouldUseASeparateNamespace(room: string): boolean {
const isPublicRoom = !this.sids.has(room);

return (
(this.opts.subscriptionMode === "dynamic" && isPublicRoom) ||
this.opts.subscriptionMode === "dynamic-private"
);
}
}
22 changes: 22 additions & 0 deletions test/test-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,28 @@ describe("@socket.io/redis-adapter", () => {
true
));

describe("[sharded] redis@4 standalone (dynamic subscription mode & dynamic private channels)", () =>
testSuite(
async () => {
const pubClient = createClient();
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

return [
createShardedAdapter(pubClient, subClient, {
subscriptionMode: "dynamic-private",
}),
() => {
pubClient.disconnect();
subClient.disconnect();
},
];
},
"redis@4",
true
));

describe("[sharded] redis@4 standalone (static subscription mode)", () =>
testSuite(
async () => {
Expand Down

0 comments on commit 033a358

Please sign in to comment.