Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
leibale committed Oct 18, 2021
1 parent c4d10a3 commit 47799f5
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 25 deletions.
20 changes: 10 additions & 10 deletions lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@ describe('Client', () => {

assert.equal(client.isOpen, false);
});
});

describe('authWithDatabaseSelect', () => {
const database = 2
itWithClient(TestRedisServers.PASSWORD, 'Client should auth success and select index 2', async client => {
itWithClient(TestRedisServers.PASSWORD, 'should execute AUTH before SELECT', async client => {
assert.equal(
await client.ping(),
'PONG'
(await client.clientInfo()).db,
2
);
let info = await client.clientInfo()
assert.equal(info.db, database)
}, undefined, { database });
})
}, {
minimumRedisVersion: [6, 2],
clientOptions: {
database: 2
}
});
});

describe('legacyMode', () => {
const client = RedisClient.create({
Expand Down
38 changes: 29 additions & 9 deletions lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,24 +177,44 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>

#initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => {
const v4Commands = this.#options?.legacyMode ? this.#v4 : this,
promises = [];

if (this.#options?.username || this.#options?.password) {
await v4Commands.auth(RedisClient.commandOptions({ asap: true }), this.#options);
}
const promises = [];

if (this.#selectedDB !== 0) {
promises.push(v4Commands.select(RedisClient.commandOptions({ asap: true }), this.#selectedDB));
promises.push(
this.#queue.addCommand(
['SELECT', this.#selectedDB.toString()],
{ asap: true }
)
);
}

if (this.#options?.readonly) {
promises.push(v4Commands.readonly(RedisClient.commandOptions({ asap: true })));
promises.push(
this.#queue.addCommand(
COMMANDS.READONLY.transformArguments(),
{ asap: true }
)
);
}

if (this.#options?.username || this.#options?.password) {
promises.push(
this.#queue.addCommand(
COMMANDS.AUTH.transformArguments({
username: this.#options.username,
password: this.#options.password ?? ''
}),
{ asap: true }
)
);
}

const resubscribePromise = this.#queue.resubscribe();
if (resubscribePromise) {
promises.push(resubscribePromise);
}

if (promises.length) {
this.#tick();
}

Expand Down Expand Up @@ -410,7 +430,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
quit = this.QUIT;

#tick(): void {
if (!this.#socket.isSocketExists) {
if (!this.#socket.isSocketExists || this.#socket.writableNeedDrain) {
return;
}

Expand Down
17 changes: 15 additions & 2 deletions lib/client/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ export default class RedisSocket extends EventEmitter {
return !!this.#socket;
}

// `writable.writableNeedDrain` was added in v15.2.0 and therefore can't be used
// https://nodejs.org/api/stream.html#stream_writable_writableneeddrain
#writableNeedDrain = false;

get writableNeedDrain(): boolean {
return this.#writableNeedDrain;
}

constructor(initiator?: RedisSocketInitiator, options?: RedisSocketOptions) {
super();

Expand Down Expand Up @@ -163,7 +171,10 @@ export default class RedisSocket extends EventEmitter {
this.#onSocketError(new Error('Socket closed unexpectedly'));
}
})
.on('drain', () => this.emit('drain'))
.on('drain', () => {
this.#writableNeedDrain = false;
this.emit('drain');
})
.on('data', (data: Buffer) => this.emit('data', data));

resolve(socket);
Expand Down Expand Up @@ -198,7 +209,9 @@ export default class RedisSocket extends EventEmitter {
throw new ClientClosedError();
}

return this.#socket.write(toWrite);
const wasFullyWritten = this.#socket.write(toWrite);
this.#writableNeedDrain = !wasFullyWritten;
return wasFullyWritten;
}

async disconnect(ignoreIsOpen = false): Promise<void> {
Expand Down
14 changes: 10 additions & 4 deletions lib/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,23 @@ export function describeHandleMinimumRedisVersion(minimumVersion: PartialRedisVe
});
}

interface RedisClientTestOptions extends RedisTestOptions {
clientOptions?: RedisClientOptions<{}, {}>;
}

export function itWithClient(
type: TestRedisServers,
title: string,
fn: (client: RedisClientType) => Promise<void>,
options?: RedisTestOptions,
clientOptions?: RedisClientOptions<{}, {}>
options?: RedisClientTestOptions
): void {
it(title, async function () {
if (handleMinimumRedisVersion(this, options?.minimumRedisVersion)) return;

const client = RedisClient.create(Object.assign({}, TEST_REDIS_SERVERS[type], clientOptions));

const client = RedisClient.create({
...TEST_REDIS_SERVERS[type],
...options?.clientOptions
});

await client.connect();

Expand Down

0 comments on commit 47799f5

Please sign in to comment.