Skip to content

Commit

Permalink
chore: cleanup whenCurrentJobsFinished
Browse files Browse the repository at this point in the history
Cleanup and document whenCurrentJobsFinished. And since it's now
official API, add some tests for it.
  • Loading branch information
gabegorelick committed Nov 6, 2019
1 parent d99f7f6 commit d13d99d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 43 deletions.
17 changes: 15 additions & 2 deletions REFERENCE.md
Expand Up @@ -6,6 +6,7 @@
- [Queue#add](#queueadd)
- [Queue#pause](#queuepause)
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
Expand Down Expand Up @@ -149,7 +150,7 @@ process(name: string, concurrency: number, processor: ((job, done?) => Promise<a

Defines a processing function for the jobs in a given Queue.

The callback is called everytime a job is placed in the queue. It is passed an instance of the job as first argument.
The callback is called every time a job is placed in the queue. It is passed an instance of the job as first argument.

If the callback signature contains the second optional `done` argument, the callback will be passed a `done` callback to be called after the job has been completed. The `done` callback can be called with an Error instance, to signal that the job did not complete successfully, or with a result as second argument (e.g.: `done(null, result);`) when the job is successful. Errors will be passed as a second argument to the "failed" event;
results, as a second argument to the "completed" event.
Expand Down Expand Up @@ -301,11 +302,13 @@ interface BackoffOpts {
### Queue#pause

```ts
pause(isLocal?: boolean): Promise
pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise
```

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

---
Expand All @@ -322,6 +325,16 @@ Resuming a queue that is not paused does nothing.

---

### Queue#whenCurrentJobsFinished

```ts
whenCurrentJobsFinished(): Promise<Void>
```

Returns a promise that resolves when all jobs currently being processed by this worker have finished.

---

### Queue#count

```ts
Expand Down
53 changes: 12 additions & 41 deletions lib/queue.js
Expand Up @@ -1174,49 +1174,20 @@ Queue.prototype.clean = function(grace, type, limit) {
* @returns {Promise}
*/
Queue.prototype.whenCurrentJobsFinished = function() {
return new Promise((resolve, reject) => {
if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return resolve();
}

//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});
if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return Promise.resolve();
}

Promise.all(this.processing)
.then(() => {
return forcedReconnection;
})
.then(resolve, reject);

/*
this.bclient.disconnect();
this.bclient.once('end', function(){
console.error('ENDED!');
setTimeout(function(){
this.bclient.connect();
}, 0);
});
//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});

/*
var stream = this.bclient.connector.stream;
if(stream){
stream.on('finish', function(){
console.error('FINISHED!');
this.bclient.connect();
});
stream.on('error', function(err){
console.error('errir', err);
this.bclient.connect();
});
this.bclient.connect();
}
*/
//this.bclient.connect();
return Promise.all([this.processing[0]]).then(() => {
return forcedReconnection;
});
};

Expand Down
97 changes: 97 additions & 0 deletions test/test_when_current_jobs_finished.js
@@ -0,0 +1,97 @@
'use strict';

const expect = require('chai').expect;
const redis = require('ioredis');
const utils = require('./utils');
const delay = require('delay');
const sinon = require('sinon');

describe('.whenCurrentJobsFinished', () => {
let client;
beforeEach(() => {
client = new redis();
return client.flushdb();
});

afterEach(async () => {
sinon.restore();
await utils.cleanupQueues();
await client.flushdb();
return client.quit();
});

it('should handle queue with no processor', async () => {
const queue = await utils.newQueue();
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});

it('should handle queue with no jobs', async () => {
const queue = await utils.newQueue();
queue.process(() => Promise.resolve());
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});

it('should wait for job to complete', async () => {
const queue = await utils.newQueue();
await queue.add({});

let finishJob;

// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();

return new Promise(resolve => {
finishJob = resolve;
});
});
});

let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});

await delay(100);
expect(isFulfilled).to.equal(false);

finishJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});

it('should wait for job to fail', async () => {
const queue = await utils.newQueue();
await queue.add({});

let rejectJob;

// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();

return new Promise((resolve, reject) => {
rejectJob = reject;
});
});
});

let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});

await delay(100);
expect(isFulfilled).to.equal(false);

rejectJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});
});

0 comments on commit d13d99d

Please sign in to comment.