Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: faster delay handling. fixes #1893 #1897

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 20 additions & 15 deletions lib/commands/moveToActive-8.lua
Expand Up @@ -10,7 +10,7 @@
KEYS[1] wait key
KEYS[2] active key
KEYS[3] priority key
KEYS[4] active event key
KEYS[4] active key
KEYS[5] stalled key

-- Rate limiting
Expand All @@ -34,12 +34,11 @@

local rcall = redis.call

local rateLimit = function(jobId, maxJobs)
local rateLimiterKey = KEYS[6];
local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit)
local limiterIndexTable = rateLimiterKey .. ":index"

-- Rate limit by group?
if(ARGV[9]) then
if(groupLimit) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
Expand Down Expand Up @@ -68,7 +67,7 @@ local rateLimit = function(jobId, maxJobs)

if numLimitedJobs > 0 then
-- Note, add some slack to compensate for drift.
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
delay = ((numLimitedJobs * timeUnit * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
end
end

Expand All @@ -80,31 +79,29 @@ local rateLimit = function(jobId, maxJobs)
if (delay == 0) and (jobCounter >= maxJobs) then
-- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for this unit of time so we need to rate limit this job.
local exceedingJobs = jobCounter - maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * ARGV[7]) / maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * timeUnit) / maxJobs
end

if delay > 0 then
local bounceBack = ARGV[8]
if bounceBack == 'false' then
local timestamp = delay + tonumber(ARGV[4])
local timestamp = delay + tonumber(timestamp)
-- put job into delayed queue
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", KEYS[7], timestamp)
rcall("ZADD", delayedKey, timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", delayedKey, timestamp)
rcall("SADD", limitedSetKey, jobId)

-- store index so that we can delete rate limited data
rcall("HSET", limiterIndexTable, jobId, limitedSetKey)

end

-- remove from active queue
rcall("LREM", KEYS[2], 1, jobId)
rcall("LREM", activeKey, 1, jobId)
return true
else
-- false indicates not rate limited
-- increment jobCounter only when a job is not rate limited
if (jobCounter == 0) then
rcall("PSETEX", rateLimiterKey, ARGV[7], 1)
rcall("PSETEX", rateLimiterKey, timeUnit, 1)
else
rcall("INCR", rateLimiterKey)
end
Expand All @@ -127,8 +124,16 @@ if jobId then
local maxJobs = tonumber(ARGV[6])

if maxJobs then
if rateLimit(jobId, maxJobs) then
return
if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then
-- Check if we can return a delayed job instead
jobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[4]) * 0x1000, "LIMIT", 0, 1)[1]
if jobId then
if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then
return
end
else
return
end
end
end

Expand Down
118 changes: 0 additions & 118 deletions lib/commands/moveToFinished-7.lua

This file was deleted.