Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Speed up performance of queue.clean when called with a limit #2205

Merged
merged 5 commits into from Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 73 additions & 31 deletions lib/commands/cleanJobsInSet-3.lua
Expand Up @@ -11,52 +11,94 @@
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
]]

local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]

local jobKeyPrefix = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]

local command = "ZRANGE"
local isList = false
local rcall = redis.call

if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then
if setName == "wait" or setName == "active" or setName == "paused" then
command = "LRANGE"
isList = true
end

local jobIds = rcall(command, KEYS[1], 0, -1)
local limit = tonumber(limitStr)
local rangeStart = 0
local rangeEnd = -1

-- If we're only deleting _n_ items, avoid retrieving all items
-- for faster performance
--
-- Start from the tail of the list, since that's where oldest elements
-- are generally added for FIFO lists
if limit > 0 then
rangeStart = -1 - limit + 1
rangeEnd = -1
end

local jobIds = rcall(command, setKey, rangeStart, rangeEnd)
local deleted = {}
local deletedCount = 0
local limit = tonumber(ARGV[3])
local jobTS
for _, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end

local jobKey = ARGV[1] .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < ARGV[2]) then
if isList then
rcall("LREM", KEYS[1], 0, jobId)
else
rcall("ZREM", KEYS[1], jobId)
end
rcall("ZREM", KEYS[2], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

-- delete keys related to rate limiter
-- NOTE: this code is unncessary for other sets than wait, paused and delayed.
local limiterIndexTable = KEYS[3] .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)

if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end
-- Run this loop:
-- - Once, if limit is -1 or 0
-- - As many times as needed if limit is positive
while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
for _, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end

local jobKey = jobKeyPrefix .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < maxTimestamp) then
if isList then
rcall("LREM", setKey, 0, jobId)
else
rcall("ZREM", setKey, jobId)
end
rcall("ZREM", priorityKey, jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

-- delete keys related to rate limiter
-- NOTE: this code is unncessary for other sets than wait, paused and delayed.
local limiterIndexTable = rateLimiterKey .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)

deletedCount = deletedCount + 1
table.insert(deleted, jobId)
if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end

deletedCount = deletedCount + 1
table.insert(deleted, jobId)
end
end
end

-- If we didn't have a limit, return immediately. We should have deleted
-- all the jobs we can
if limit <= 0 then
break
end

if deletedCount < limit then
-- We didn't delete enough. Look for more to delete
rangeStart = rangeStart - limit
rangeEnd = rangeEnd - limit
jobIds = rcall(command, setKey, rangeStart, rangeEnd)
end
end

return deleted
28 changes: 28 additions & 0 deletions test/test_queue.js
Expand Up @@ -2837,6 +2837,34 @@ describe('Queue', () => {
});
});

it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => {
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });
// This job should get cleaned since 10000 > 5000 grace
const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 });
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });

const cleaned = await queue.clean(5000, 'wait', 1);
expect(cleaned.length).to.be.eql(1);
expect(cleaned[0]).to.eql(jobToClean.id);

const len = await queue.count();
expect(len).to.be.eql(2);
});

it('shouldn\'t clean anything if all jobs are in grace period', async () => {
await queue.add({ some: 'data' });
await queue.add({ some: 'data' });

const cleaned = await queue.clean(5000, 'wait', 1);
expect(cleaned.length).to.be.eql(0);

const cleaned2 = await queue.clean(5000, 'wait');
expect(cleaned2.length).to.be.eql(0);
expect(await queue.count()).to.be.eql(2);
});

it('should properly clean jobs from the priority set', done => {
const client = new redis(6379, '127.0.0.1', {});
queue.add({ some: 'data' }, { priority: 5 });
Expand Down