Skip to content

Commit

Permalink
test(repeat): fix flaky tests (#1578)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Dec 9, 2022
1 parent 6d9dada commit 8f9eefd
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 66 deletions.
46 changes: 23 additions & 23 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,6 @@ export class Worker<

this.id = v4();

this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
? (<Redis>opts.connection).duplicate()
: opts.connection,
);
this.blockingConnection.on('error', error => this.emit('error', error));

this.blockingConnection.on('ready', async () => {
try {
const client = await this.blockingConnection.client;
await client.client('SETNAME', this.clientName(WORKER_SUFFIX));
} catch (error) {
if (!clientCommandMessageReg.test((<Error>error).message)) {
this.emit('error', <Error>error);
}
}
this.emit('ready');
});

if (processor) {
if (typeof processor === 'function') {
this.processFn = processor;
Expand Down Expand Up @@ -267,6 +248,25 @@ export class Worker<
this.run().catch(error => this.emit('error', error));
}
}

this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
? (<Redis>opts.connection).duplicate()
: opts.connection,
);
this.blockingConnection.on('error', error => this.emit('error', error));

this.blockingConnection.on('ready', async () => {
try {
const client = await this.blockingConnection.client;
await client.client('SETNAME', this.clientName(WORKER_SUFFIX));
} catch (error) {
if (!clientCommandMessageReg.test((<Error>error).message)) {
this.emit('error', <Error>error);
}
}
this.emit('ready');
});
}

emit<U extends keyof WorkerListener<DataType, ResultType, NameType>>(
Expand Down Expand Up @@ -456,7 +456,7 @@ export class Worker<
} else {
if (this.limitUntil) {
// TODO: We need to be able to break this delay when we are closing the worker.
await delay(this.limitUntil);
await this.delay(this.limitUntil);
}
return this.moveToActive(token);
}
Expand Down Expand Up @@ -539,8 +539,8 @@ export class Worker<
*
* This function is exposed only for testing purposes.
*/
async delay(): Promise<void> {
await delay(DELAY_TIME_1);
async delay(milliseconds?: number): Promise<void> {
await delay(milliseconds || DELAY_TIME_1);
}

protected async nextJobFromJobData(
Expand Down Expand Up @@ -789,7 +789,7 @@ export class Worker<
} catch (err) {
this.emit('error', <Error>err);
if (delayInMs) {
await delay(delayInMs);
await this.delay(delayInMs);
} else {
return;
}
Expand Down
3 changes: 3 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ describe('flows', () => {
describe('when continually adding jobs', async () => {
it('adds jobs that do not exists', async () => {
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
});

Expand Down Expand Up @@ -855,6 +856,8 @@ describe('flows', () => {
],
});

worker.run();

await completing1;

const completing2 = new Promise<void>(resolve => {
Expand Down
36 changes: 28 additions & 8 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ describe('Jobs getters', function () {

describe('.getWorkers', () => {
it('gets all workers for this queue only', async function () {
const worker = new Worker(queueName, async () => {}, { connection });
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
});
await new Promise<void>(resolve => {
worker.on('ready', () => {
resolve();
Expand All @@ -57,7 +60,10 @@ describe('Jobs getters', function () {
const workers = await queue.getWorkers();
expect(workers).to.have.length(1);

const worker2 = new Worker(queueName, async () => {}, { connection });
const worker2 = new Worker(queueName, async () => {}, {
autorun: false,
connection,
});
await new Promise<void>(resolve => {
worker2.on('ready', () => {
resolve();
Expand All @@ -74,13 +80,19 @@ describe('Jobs getters', function () {
it('gets only workers related only to one queue', async function () {
const queueName2 = `${queueName}2`;
const queue2 = new Queue(queueName2, { connection });
const worker = new Worker(queueName, async () => {}, { connection });
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
});
await new Promise<void>(resolve => {
worker.on('ready', () => {
resolve();
});
});
const worker2 = new Worker(queueName2, async () => {}, { connection });
const worker2 = new Worker(queueName2, async () => {}, {
autorun: false,
connection,
});
await new Promise<void>(resolve => {
worker2.on('ready', () => {
resolve();
Expand All @@ -103,10 +115,14 @@ describe('Jobs getters', function () {
it('gets same reference for all workers for same queue', async function () {
const ioredisConnection = new IORedis({ maxRetriesPerRequest: null });
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection: ioredisConnection,
});
await worker.waitUntilReady();
await delay(10);
await new Promise<void>(resolve => {
worker.on('ready', () => {
resolve();
});
});

const workers = await queue.getWorkers();
expect(workers).to.have.length(1);
Expand All @@ -115,7 +131,6 @@ describe('Jobs getters', function () {
connection: ioredisConnection,
});
await worker2.waitUntilReady();
await delay(10);

const nextWorkers = await queue.getWorkers();
expect(nextWorkers).to.have.length(1);
Expand All @@ -129,10 +144,15 @@ describe('Jobs getters', function () {
describe('when disconnection happens', () => {
it('gets all workers even after reconnection', async function () {
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
});
await new Promise<void>(resolve => {
worker.on('ready', () => {
resolve();
});
});
const client = await worker.waitUntilReady();
await delay(10);

const workers = await queue.getWorkers();
expect(workers).to.have.length(1);
Expand Down

0 comments on commit 8f9eefd

Please sign in to comment.