Skip to content

Commit

Permalink
perf: speed up performance of queue.clean when called with a limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pxpeterxu committed Dec 14, 2021
1 parent 57f24df commit c20e469
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 31 deletions.
104 changes: 73 additions & 31 deletions lib/commands/cleanJobsInSet-3.lua
Expand Up @@ -11,52 +11,94 @@
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
]]

local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]

local jobKeyPrefix = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]

local command = "ZRANGE"
local isList = false
local rcall = redis.call

if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then
if setName == "wait" or setName == "active" or setName == "paused" then
command = "LRANGE"
isList = true
end

local jobIds = rcall(command, KEYS[1], 0, -1)
local limit = tonumber(limitStr)
local rangeStart = 0
local rangeEnd = -1

-- If we're only deleting _n_ items, avoid retrieving all items
-- for faster performance
--
-- Start from the tail of the list, since that's where oldest elements
-- are generally added for FIFO lists
if limit > 0 then
rangeStart = -1 - limit + 1
rangeEnd = -1
end

local jobIds = rcall(command, setKey, rangeStart, rangeEnd)
local deleted = {}
local deletedCount = 0
local limit = tonumber(ARGV[3])
local jobTS
for _, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end

local jobKey = ARGV[1] .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < ARGV[2]) then
if isList then
rcall("LREM", KEYS[1], 0, jobId)
else
rcall("ZREM", KEYS[1], jobId)
end
rcall("ZREM", KEYS[2], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

-- delete keys related to rate limiter
-- NOTE: this code is unncessary for other sets than wait, paused and delayed.
local limiterIndexTable = KEYS[3] .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)

if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end
-- Run this loop:
-- - Once, if limit is -1 or 0
-- - As many times as needed if limit is positive
while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
for _, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end

local jobKey = jobKeyPrefix .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < maxTimestamp) then
if isList then
rcall("LREM", setKey, 0, jobId)
else
rcall("ZREM", setKey, jobId)
end
rcall("ZREM", priorityKey, jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

-- delete keys related to rate limiter
-- NOTE: this code is unncessary for other sets than wait, paused and delayed.
local limiterIndexTable = rateLimiterKey .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)

deletedCount = deletedCount + 1
table.insert(deleted, jobId)
if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end

deletedCount = deletedCount + 1
table.insert(deleted, jobId)
end
end
end

-- If we didn't have a limit, return immediately. We should have deleted
-- all the jobs we can
if limit <= 0 then
break
end

if deletedCount < limit then
-- We didn't delete enough. Look for more to delete
rangeStart = rangeStart - limit
rangeEnd = rangeEnd - limit
jobIds = rcall(command, setKey, rangeStart, rangeEnd)
end
end

return deleted
28 changes: 28 additions & 0 deletions test/test_queue.js
Expand Up @@ -2837,6 +2837,34 @@ describe('Queue', () => {
});
});

it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => {
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });
// This job should get cleaned since 10000 > 5000 grace
const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 });
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });

const cleaned = await queue.clean(5000, 'wait', 1);
expect(cleaned.length).to.be.eql(1);
expect(cleaned[0]).to.eql(jobToClean.id);

const len = await queue.count();
expect(len).to.be.eql(2);
});

it('shouldn\'t clean anything if all jobs are in grace period', async () => {
await queue.add({ some: 'data' });
await queue.add({ some: 'data' });

const cleaned = await queue.clean(5000, 'wait', 1);
expect(cleaned.length).to.be.eql(0);

const cleaned2 = await queue.clean(5000, 'wait');
expect(cleaned2.length).to.be.eql(0);
expect(await queue.count()).to.be.eql(2);
});

it('should properly clean jobs from the priority set', done => {
const client = new redis(6379, '127.0.0.1', {});
queue.add({ some: 'data' }, { priority: 5 });
Expand Down

0 comments on commit c20e469

Please sign in to comment.