From f9e014298bd193aed273c0f19fcae7cc71857c6f Mon Sep 17 00:00:00 2001 From: Peter Xu Date: Thu, 4 Nov 2021 22:00:04 -0700 Subject: [PATCH 1/5] Speed up performance of queue.clean when called with a limit --- lib/commands/cleanJobsInSet-3.lua | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 7de75c3a7..577b7f3bf 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -20,10 +20,18 @@ if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then isList = true end -local jobIds = rcall(command, KEYS[1], 0, -1) +local limit = tonumber(ARGV[3]) +local range_limit = -1 + +-- If we're only deleting _n_ items, avoid retrieving all items +-- for faster performance +if limit > 0 then + range_limit = limit +end + +local jobIds = rcall(command, KEYS[1], 0, range_limit) local deleted = {} local deletedCount = 0 -local limit = tonumber(ARGV[3]) local jobTS for _, jobId in ipairs(jobIds) do if limit > 0 and deletedCount >= limit then From 6dfcf3a020a37d075512f4ebc15a7ac6334f733e Mon Sep 17 00:00:00 2001 From: Peter Xu Date: Fri, 5 Nov 2021 10:18:54 -0700 Subject: [PATCH 2/5] Run loop multiple times if limit doesn't get enough jobs to delete --- lib/commands/cleanJobsInSet-3.lua | 91 ++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 577b7f3bf..058f3ceae 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -11,17 +11,28 @@ ARGV[3] limit the number of jobs to be removed. 0 is unlimited ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed' ]] + +local setKey = KEYS[1] +local priorityKey = KEYS[2] +local rateLimiterKey = KEYS[3] + +local jobKeyPrefix = ARGV[1] +local maxTimestamp = ARGV[2] +local limitStr = ARGV[3] +local setName = ARGV[4] + local command = "ZRANGE" local isList = false local rcall = redis.call -if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then +if setName == "wait" or setName == "active" or setName == "paused" then command = "LRANGE" isList = true end -local limit = tonumber(ARGV[3]) +local limit = tonumber(limitStr) local range_limit = -1 +local range_start = 0 -- If we're only deleting _n_ items, avoid retrieving all items -- for faster performance @@ -29,42 +40,60 @@ if limit > 0 then range_limit = limit end -local jobIds = rcall(command, KEYS[1], 0, range_limit) +local jobIds = rcall(command, setKey, range_start, range_limit) local deleted = {} local deletedCount = 0 local jobTS -for _, jobId in ipairs(jobIds) do - if limit > 0 and deletedCount >= limit then - break - end - local jobKey = ARGV[1] .. jobId - if (rcall("EXISTS", jobKey .. ":lock") == 0) then - jobTS = rcall("HGET", jobKey, "timestamp") - if (not jobTS or jobTS < ARGV[2]) then - if isList then - rcall("LREM", KEYS[1], 0, jobId) - else - rcall("ZREM", KEYS[1], jobId) - end - rcall("ZREM", KEYS[2], jobId) - rcall("DEL", jobKey) - rcall("DEL", jobKey .. ":logs") - - -- delete keys related to rate limiter - -- NOTE: this code is unncessary for other sets than wait, paused and delayed. - local limiterIndexTable = KEYS[3] .. ":index" - local limitedSetKey = rcall("HGET", limiterIndexTable, jobId) - - if limitedSetKey then - rcall("SREM", limitedSetKey, jobId) - rcall("HDEL", limiterIndexTable, jobId) - end +-- Run this loop: +-- - Once, if limit is -1 or 0 +-- - As many times as needed if limit is positive +while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do + for _, jobId in ipairs(jobIds) do + if limit > 0 and deletedCount >= limit then + break + end + + local jobKey = jobKeyPrefix .. jobId + if (rcall("EXISTS", jobKey .. ":lock") == 0) then + jobTS = rcall("HGET", jobKey, "timestamp") + if (not jobTS or jobTS < maxTimestamp) then + if isList then + rcall("LREM", setKey, 0, jobId) + else + rcall("ZREM", setKey, jobId) + end + rcall("ZREM", priorityKey, jobId) + rcall("DEL", jobKey) + rcall("DEL", jobKey .. ":logs") - deletedCount = deletedCount + 1 - table.insert(deleted, jobId) + -- delete keys related to rate limiter + -- NOTE: this code is unncessary for other sets than wait, paused and delayed. + local limiterIndexTable = rateLimiterKey .. ":index" + local limitedSetKey = rcall("HGET", limiterIndexTable, jobId) + + if limitedSetKey then + rcall("SREM", limitedSetKey, jobId) + rcall("HDEL", limiterIndexTable, jobId) + end + + deletedCount = deletedCount + 1 + table.insert(deleted, jobId) + end end end + + -- If we didn't have a limit, return immediately. We should have deleted + -- all the jobs we can + if limit <= 0 then + break + end + + if deletedCount < limit then + -- We didn't delete enough. Look for more to delete + range_start = range_start + range_limit + jobIds = rcall(command, setKey, range_start, range_limit) + end end return deleted From b58bfaf168ee33e50a56656c4cae81377bc2c3ab Mon Sep 17 00:00:00 2001 From: Peter Xu Date: Fri, 5 Nov 2021 15:34:27 -0700 Subject: [PATCH 3/5] Standardize variable naming --- lib/commands/cleanJobsInSet-3.lua | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 058f3ceae..b6c5c3bab 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -31,16 +31,16 @@ if setName == "wait" or setName == "active" or setName == "paused" then end local limit = tonumber(limitStr) -local range_limit = -1 -local range_start = 0 +local rangeLimit = -1 +local rangeStart = 0 -- If we're only deleting _n_ items, avoid retrieving all items -- for faster performance if limit > 0 then - range_limit = limit + rangeLimit = limit end -local jobIds = rcall(command, setKey, range_start, range_limit) +local jobIds = rcall(command, setKey, rangeStart, rangeLimit) local deleted = {} local deletedCount = 0 local jobTS @@ -91,8 +91,8 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do if deletedCount < limit then -- We didn't delete enough. Look for more to delete - range_start = range_start + range_limit - jobIds = rcall(command, setKey, range_start, range_limit) + rangeStart = rangeStart + rangeLimit + jobIds = rcall(command, setKey, rangeStart, rangeLimit) end end From cc8447ffacef57eaad61c8ac8cf910d7dcd0d631 Mon Sep 17 00:00:00 2001 From: Peter Xu Date: Sat, 6 Nov 2021 12:00:41 -0700 Subject: [PATCH 4/5] Add tests and fix bugs in script --- lib/commands/cleanJobsInSet-3.lua | 11 ++++++----- test/test_queue.js | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index b6c5c3bab..0767885e3 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -31,16 +31,16 @@ if setName == "wait" or setName == "active" or setName == "paused" then end local limit = tonumber(limitStr) -local rangeLimit = -1 local rangeStart = 0 +local rangeEnd = -1 -- If we're only deleting _n_ items, avoid retrieving all items -- for faster performance if limit > 0 then - rangeLimit = limit + rangeEnd = limit - 1 end -local jobIds = rcall(command, setKey, rangeStart, rangeLimit) +local jobIds = rcall(command, setKey, rangeStart, rangeEnd) local deleted = {} local deletedCount = 0 local jobTS @@ -91,8 +91,9 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do if deletedCount < limit then -- We didn't delete enough. Look for more to delete - rangeStart = rangeStart + rangeLimit - jobIds = rcall(command, setKey, rangeStart, rangeLimit) + rangeStart = rangeStart + limit + rangeEnd = rangeEnd + limit + jobIds = rcall(command, setKey, rangeStart, rangeEnd) end end diff --git a/test/test_queue.js b/test/test_queue.js index f211b0f43..93203faf8 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2837,6 +2837,22 @@ describe('Queue', () => { }); }); + it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => { + // This job shouldn't get deleted due to the 5000 grace + await queue.add({ some: 'data' }); + // This job should get cleaned since 10000 > 5000 grace + const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 }); + // This job shouldn't get deleted due to the 5000 grace + await queue.add({ some: 'data' }); + + const cleaned = await queue.clean(5000, 'wait', 1); + expect(cleaned.length).to.be.eql(1); + expect(cleaned[0]).to.eql(jobToClean.id); + + const len = await queue.count(); + expect(len).to.be.eql(2); + }); + it('should properly clean jobs from the priority set', done => { const client = new redis(6379, '127.0.0.1', {}); queue.add({ some: 'data' }, { priority: 5 }); From 7fc3750de67d5b09a89cf6f3e56fb2aa5fb4c698 Mon Sep 17 00:00:00 2001 From: Peter Xu Date: Sat, 6 Nov 2021 15:00:08 -0700 Subject: [PATCH 5/5] Improve performance by usually starting from earliest-added items --- lib/commands/cleanJobsInSet-3.lua | 10 +++++++--- test/test_queue.js | 12 ++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 0767885e3..98a98d906 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -36,8 +36,12 @@ local rangeEnd = -1 -- If we're only deleting _n_ items, avoid retrieving all items -- for faster performance +-- +-- Start from the tail of the list, since that's where oldest elements +-- are generally added for FIFO lists if limit > 0 then - rangeEnd = limit - 1 + rangeStart = -1 - limit + 1 + rangeEnd = -1 end local jobIds = rcall(command, setKey, rangeStart, rangeEnd) @@ -91,8 +95,8 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do if deletedCount < limit then -- We didn't delete enough. Look for more to delete - rangeStart = rangeStart + limit - rangeEnd = rangeEnd + limit + rangeStart = rangeStart - limit + rangeEnd = rangeEnd - limit jobIds = rcall(command, setKey, rangeStart, rangeEnd) end end diff --git a/test/test_queue.js b/test/test_queue.js index 93203faf8..e703d600b 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2853,6 +2853,18 @@ describe('Queue', () => { expect(len).to.be.eql(2); }); + it('shouldn\'t clean anything if all jobs are in grace period', async () => { + await queue.add({ some: 'data' }); + await queue.add({ some: 'data' }); + + const cleaned = await queue.clean(5000, 'wait', 1); + expect(cleaned.length).to.be.eql(0); + + const cleaned2 = await queue.clean(5000, 'wait'); + expect(cleaned2.length).to.be.eql(0); + expect(await queue.count()).to.be.eql(2); + }); + it('should properly clean jobs from the priority set', done => { const client = new redis(6379, '127.0.0.1', {}); queue.add({ some: 'data' }, { priority: 5 });