Skip to content

Commit

Permalink
perf(clean): add stopWithinGrace to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
hsource committed May 27, 2022
1 parent 41ec58e commit 3e64dfc
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 21 deletions.
20 changes: 19 additions & 1 deletion REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -830,11 +830,29 @@ Returns a promise that resolves to a Metrics object.
### Queue#clean

```ts
clean(grace: number, status?: string, limit?: number): Promise<number[]>
clean(grace: number, status?: string, opts?: CleanOpts): Promise<number[]>
```

Tells the queue remove jobs of a specific type created outside of a grace period.

To ensure the clean finishes within a fairly short period of time, you may want
to provide `opts` to limit the clean operation.

```typescript
interface CleanOpts {
/** How many entries to clean before returning. Defaults to unlimited */
limit?: number;
/**
* A performance optimization causing us to stop as soon as we encounter a
* timestamp within the grace period.
*
* This can be used with `limit` to prevent us from endlessly iterating
* through large queues filled with entries within the grace period.
*/
stopWithinGrace?: boolean;
}
```

#### Example

```js
Expand Down
36 changes: 24 additions & 12 deletions lib/commands/cleanJobsInSet-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@
KEYS[3] rate limiter key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
ARGV[2] maxTimestamp
ARGV[3] stopAfterMaxTimestamp, can be set as an optimization to stop when we see a time after maxTimestamp
ARGV[4] limit the number of jobs to be removed. 0 is unlimited
ARGV[5] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
]]

local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]

local jobKeyPrefix = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]
local maxTimestamp = tonumber(ARGV[2])
local stopAfterMaxTimestamp = ARGV[3] == "true"
local limitStr = ARGV[4]
local setName = ARGV[5]

local command = "ZRANGE"
local isList = false
Expand All @@ -37,17 +39,18 @@ local rangeEnd = -1
-- If we're only deleting _n_ items, avoid retrieving all items
-- for faster performance
--
-- Start from the tail of the list, since that's where oldest elements
-- Start from the start of the list, since that's where oldest elements
-- are generally added for FIFO lists
if limit > 0 then
rangeStart = -1 - limit + 1
rangeEnd = -1
rangeStart = 0
rangeEnd = limit
end

local jobIds = rcall(command, setKey, rangeStart, rangeEnd)
local deleted = {}
local deletedCount = 0
local jobTS
local passedMaxTimestamp = false

-- Run this loop:
-- - Once, if limit is -1 or 0
Expand All @@ -68,7 +71,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
-- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs that have been active within the grace period:
for _, ts in ipairs(rcall("HMGET", jobKey, "finishedOn", "processedOn", "timestamp")) do
if (ts) then
jobTS = ts
jobTS = tonumber(ts)
break
end
end
Expand Down Expand Up @@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
deletedCount = deletedCount + 1
table.insert(deleted, jobId)
end

if (jobTS and jobTS > maxTimestamp) then
passedMaxTimestamp = true
break
end
end
end

Expand All @@ -107,10 +115,14 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
break
end

if maxTimestamp and stopAfterMaxTimestamp and passedMaxTimestamp then
break
end

if deletedCount < limit then
-- We didn't delete enough. Look for more to delete
rangeStart = rangeStart - limit
rangeEnd = rangeEnd - limit
rangeStart = rangeStart + limit
rangeEnd = rangeEnd + limit
jobIds = rcall(command, setKey, rangeStart, rangeEnd)
end
end
Expand Down
28 changes: 22 additions & 6 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1246,16 +1246,19 @@ Queue.prototype.toKey = function(queueType) {
return [this.keyPrefix, this.name, queueType].join(':');
};

/*@function clean
*
/**
* Cleans jobs from a queue. Similar to remove but keeps jobs within a certain
* grace period.
*
* @param {int} grace - The grace period
* @param {number} grace - The grace period in milliseconds
* @param {string} [type=completed] - The type of job to clean. Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
* @param {int} The max number of jobs to clean
* @param {number} opts.limit - The maximum number of jobs to clean
* @param {boolean} opts.stopWithinGrace - A performance optimization causing us
* to stop as soon as we encounter a timestamp within the grace period. This can
* be used with `limit` to prevent us from endlessly iterating through large
* queues filled with entries within the grace period.
*/
Queue.prototype.clean = function(grace, type, limit) {
Queue.prototype.clean = function(grace, type, opts) {
return this.isReady().then(() => {
if (grace === undefined || grace === null) {
throw new Error('You must define a grace period.');
Expand All @@ -1265,6 +1268,19 @@ Queue.prototype.clean = function(grace, type, limit) {
type = 'completed';
}

let innerOpts;
if (typeof opts === 'number') {
// Pre-4.8.3, the 3rd argument was a number argument for limits
innerOpts = { limit: opts };
} else if (opts) {
innerOpts = {
limit: opts.limit,
stopAfterMaxTimestamp: opts.stopWithinGrace,
};
} else {
innerOpts = {};
}

if (
_.indexOf(
['completed', 'wait', 'active', 'paused', 'delayed', 'failed'],
Expand All @@ -1275,7 +1291,7 @@ Queue.prototype.clean = function(grace, type, limit) {
}

return scripts
.cleanJobsInSet(this, type, Date.now() - grace, limit)
.cleanJobsInSet(this, type, Date.now() - grace, innerOpts)
.then(jobs => {
utils.emitSafe(this, 'cleaned', jobs, type);
return jobs;
Expand Down
5 changes: 3 additions & 2 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,15 @@ const scripts = {
return queue.client.moveStalledJobsToWait(keys.concat(args));
},

cleanJobsInSet(queue, set, ts, limit) {
cleanJobsInSet(queue, set, ts, opts) {
return queue.client.cleanJobsInSet([
queue.toKey(set),
queue.toKey('priority'),
queue.keys.limiter,
queue.toKey(''),
ts,
limit || 0,
opts.stopAfterMaxTimestamp || false,
opts.limit || 0,
set
]);
},
Expand Down

0 comments on commit 3e64dfc

Please sign in to comment.