Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: migrate deque to js-sdsl #1650

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions lib/DataHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NetStream, CommandItem, Respondable } from "./types";
import Deque = require("denque");
import { Deque } from "js-sdsl";
import { EventEmitter } from "events";
import Command from "./Command";
import { Debug } from "./utils";
Expand Down Expand Up @@ -91,11 +91,11 @@ export default class DataHandler {
);

if (!fillSubCommand(item.command, reply[2])) {
this.redis.commandQueue.unshift(item);
this.redis.commandQueue.pushFront(item);
}
} else if (Command.checkFlag("EXIT_SUBSCRIBER_MODE", item.command.name)) {
if (!fillUnsubCommand(item.command, reply[2])) {
this.redis.commandQueue.unshift(item);
this.redis.commandQueue.pushFront(item);
}
} else {
item.command.resolve(reply);
Expand Down Expand Up @@ -143,7 +143,7 @@ export default class DataHandler {
return;
}
if (!fillSubCommand(item.command, reply[2])) {
this.redis.commandQueue.unshift(item);
this.redis.commandQueue.pushFront(item);
}
break;
}
Expand All @@ -162,7 +162,7 @@ export default class DataHandler {
return;
}
if (!fillUnsubCommand(item.command, count)) {
this.redis.commandQueue.unshift(item);
this.redis.commandQueue.pushFront(item);
}
break;
}
Expand Down Expand Up @@ -207,7 +207,8 @@ export default class DataHandler {
}

private shiftCommand(reply: ReplyData | Error): CommandItem | null {
const item = this.redis.commandQueue.shift();
const item = this.redis.commandQueue.front();
this.redis.commandQueue.popFront();
if (!item) {
const message =
"Command queue state error. If you can reproduce this, please report it.";
Expand Down
21 changes: 12 additions & 9 deletions lib/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
import applyMixin from "./utils/applyMixin";
import Commander from "./utils/Commander";
import { defaults, noop } from "./utils/lodash";
import Deque = require("denque");
import { Deque } from "js-sdsl";
const debug = Debug("redis");

type RedisStatus =
Expand Down Expand Up @@ -97,7 +97,7 @@ class Redis extends Commander {
subscriber: boolean;
};
private commandQueue: Deque<CommandItem>;
private offlineQueue: Deque;
private offlineQueue: Deque<any>;
private connectionEpoch = 0;
private retryAttempts = 0;
private manuallyClosing = false;
Expand Down Expand Up @@ -463,7 +463,7 @@ class Redis extends Commander {
return command.promise;
}

if (command.name === "quit" && this.offlineQueue.length === 0) {
if (command.name === "quit" && this.offlineQueue.empty()) {
this.disconnect();
command.resolve(Buffer.from("OK"));
return command.promise;
Expand All @@ -480,7 +480,7 @@ class Redis extends Commander {
);
}

this.offlineQueue.push({
this.offlineQueue.pushBack({
command: command,
stream: stream,
select: this.condition.select,
Expand All @@ -507,7 +507,7 @@ class Redis extends Commander {
this.stream.write(command.toWritable(this.stream));
}

this.commandQueue.push({
this.commandQueue.pushBack({
command: command,
stream: stream,
select: this.condition.select,
Expand Down Expand Up @@ -750,20 +750,23 @@ class Redis extends Commander {
commandQueue: true,
});

let item;
if (options.offlineQueue) {
while ((item = this.offlineQueue.shift())) {
while (!this.offlineQueue.empty()) {
const item = this.offlineQueue.front();
this.offlineQueue.popFront();
item.command.reject(error);
}
}

if (options.commandQueue) {
if (this.commandQueue.length > 0) {
if (!this.commandQueue.empty()) {
if (this.stream) {
this.stream.removeAllListeners("data");
}

while ((item = this.commandQueue.shift())) {
while (!this.commandQueue.empty()) {
const item = this.commandQueue.front();
this.commandQueue.popFront();
item.command.reject(error);
}
}
Expand Down
11 changes: 6 additions & 5 deletions lib/cluster/DelayQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Debug } from "../utils";
import Deque = require("denque");
import { Deque } from "js-sdsl";

const debug = Debug("delayqueue");

Expand Down Expand Up @@ -29,7 +29,7 @@ export default class DelayQueue {
}

const queue = this.queues[bucket];
queue.push(item);
queue.pushBack(item);

if (!this.timeouts[bucket]) {
this.timeouts[bucket] = setTimeout(() => {
Expand All @@ -46,15 +46,16 @@ export default class DelayQueue {
if (!queue) {
return;
}
const { length } = queue;
const length = queue.size();
if (!length) {
return;
}
debug("send %d commands in %s queue", length, bucket);

this.queues[bucket] = null;
while (queue.length > 0) {
queue.shift()();
while (!queue.empty()) {
queue.front()();
queue.popFront();
}
}
}
19 changes: 10 additions & 9 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
RedisOptions,
weightSrvRecords,
} from "./util";
import Deque = require("denque");
import { Deque } from "js-sdsl";

const debug = Debug("cluster");

Expand Down Expand Up @@ -595,7 +595,7 @@ class Cluster extends Commander {
if (redis) {
redis.sendCommand(command, stream);
} else if (_this.options.enableOfflineQueue) {
_this.offlineQueue.push({
_this.offlineQueue.pushBack({
command: command,
stream: stream,
node: node,
Expand Down Expand Up @@ -768,19 +768,20 @@ class Cluster extends Commander {
* Flush offline queue with error.
*/
private flushQueue(error: Error) {
let item: OfflineQueueItem;
while ((item = this.offlineQueue.shift())) {
item.command.reject(error);
while (!this.offlineQueue.empty()) {
this.offlineQueue.front().command.reject(error);
this.offlineQueue.popFront();
}
}

private executeOfflineCommands() {
if (this.offlineQueue.length) {
debug("send %d commands in offline queue", this.offlineQueue.length);
if (!this.offlineQueue.empty()) {
debug("send %d commands in offline queue", this.offlineQueue.size());
const offlineQueue = this.offlineQueue;
this.resetOfflineQueue();
let item: OfflineQueueItem;
while ((item = offlineQueue.shift())) {
while (!offlineQueue.empty()) {
const item = offlineQueue.front();
offlineQueue.popFront();
this.sendCommand(item.command, item.stream, item.node);
}
}
Expand Down
36 changes: 19 additions & 17 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

import Deque = require("denque");
import { Deque } from "js-sdsl";
import { AbortError } from "redis-errors";
import Command from "../Command";
import { MaxRetriesPerRequestError } from "../errors";
Expand Down Expand Up @@ -115,14 +115,14 @@ function abortError(command: Respondable) {
// aborting and purging we'll have a queue that looks like this: [0, 1, 2]
function abortIncompletePipelines(commandQueue: Deque<CommandItem>) {
let expectedIndex = 0;
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
for (let i = 0; i < commandQueue.size(); ) {
const command = commandQueue.getElementByPos(i).command as Command;
const pipelineIndex = command.pipelineIndex;
if (pipelineIndex === undefined || pipelineIndex === 0) {
expectedIndex = 0;
}
if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
commandQueue.remove(i, 1);
commandQueue.eraseElementByPos(i);
command.reject(abortError(command));
continue;
}
Expand All @@ -134,18 +134,18 @@ function abortIncompletePipelines(commandQueue: Deque<CommandItem>) {
// we have to abort any transaction fragments that may have ended up in the
// offline queue
function abortTransactionFragments(commandQueue: Deque<CommandItem>) {
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
for (let i = 0; i < commandQueue.size(); ) {
const command = commandQueue.getElementByPos(i).command as Command;
if (command.name === "multi") {
break;
}
if (command.name === "exec") {
commandQueue.remove(i, 1);
commandQueue.eraseElementByPos(i);
command.reject(abortError(command));
break;
}
if ((command as Command).inTransaction) {
commandQueue.remove(i, 1);
commandQueue.eraseElementByPos(i);
command.reject(abortError(command));
} else {
i++;
Expand All @@ -160,11 +160,11 @@ export function closeHandler(self) {
if (!self.prevCondition) {
self.prevCondition = self.condition;
}
if (self.commandQueue.length) {
if (!self.commandQueue.empty()) {
abortIncompletePipelines(self.commandQueue);
self.prevCommandQueue = self.commandQueue;
}
if (self.offlineQueue.length) {
if (!self.offlineQueue.empty()) {
abortTransactionFragments(self.offlineQueue);
}

Expand Down Expand Up @@ -288,9 +288,10 @@ export function readyHandler(self) {

if (self.prevCommandQueue) {
if (self.options.autoResendUnfulfilledCommands) {
debug("resend %d unfulfilled commands", self.prevCommandQueue.length);
while (self.prevCommandQueue.length > 0) {
const item = self.prevCommandQueue.shift();
debug("resend %d unfulfilled commands", self.prevCommandQueue.size());
while (!self.prevCommandQueue.empty()) {
const item = self.prevCommandQueue.front();
self.prevCommandQueue.popFront();
if (
item.select !== self.condition.select &&
item.command.name !== "select"
Expand All @@ -304,12 +305,13 @@ export function readyHandler(self) {
}
}

if (self.offlineQueue.length) {
debug("send %d commands in offline queue", self.offlineQueue.length);
if (!self.offlineQueue.empty()) {
debug("send %d commands in offline queue", self.offlineQueue.size());
const offlineQueue = self.offlineQueue;
self.resetOfflineQueue();
while (offlineQueue.length > 0) {
const item = offlineQueue.shift();
while (!offlineQueue.empty()) {
const item = offlineQueue.front();
offlineQueue.popFront();
if (
item.select !== self.condition.select &&
item.command.name !== "select"
Expand Down
16 changes: 14 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"@ioredis/commands": "^1.1.1",
"cluster-key-slot": "^1.1.0",
"debug": "^4.3.4",
"denque": "^2.0.1",
"js-sdsl": "^4.1.4",
"lodash.defaults": "^4.2.0",
"lodash.isarguments": "^3.1.0",
"redis-errors": "^1.2.0",
Expand Down