Skip to content

Commit

Permalink
fix(job): fetch parent before job moves to complete (#1580)
Browse files Browse the repository at this point in the history
  • Loading branch information
S3bb1 committed Dec 20, 2022
1 parent 3fa5d33 commit 6a6c0dc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
2 changes: 0 additions & 2 deletions src/classes/scripts.ts
Expand Up @@ -292,8 +292,6 @@ export class Scripts {
keepJobs,
limiter: opts.limiter,
lockDuration: opts.lockDuration,
parent: job.opts?.parent,
parentKey: job.parentKey,
attempts: job.opts.attempts,
attemptsMade: job.attemptsMade,
maxMetricsSize: opts.metrics?.maxDataPoints
Expand Down
19 changes: 12 additions & 7 deletions src/commands/moveToFinished-12.lua
Expand Up @@ -35,8 +35,6 @@
opts - token - lock token
opts - keepJobs
opts - lockDuration - lock duration in milliseconds
opts - parent - parent data
opts - parentKey
opts - attempts max attempts
opts - attemptsMade
opts - maxMetricsSize
Expand Down Expand Up @@ -72,9 +70,6 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local opts = cmsgpack.unpack(ARGV[9])

local token = opts['token']
local parentId = opts['parent'] and opts['parent']['id'] or ""
local parentQueueKey = opts['parent'] and opts['parent']['queue'] or ""
local parentKey = opts['parentKey'] or ""
local attempts = opts['attempts']
local attemptsMade = opts['attemptsMade']
local maxMetricsSize = opts['maxMetricsSize']
Expand All @@ -91,7 +86,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
if lockToken then
-- Lock exists but token does not match
return -6
else
else
-- Lock is missing completely
return -2
end
Expand All @@ -102,6 +97,16 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
return -4
end

local parentReferences = rcall("HMGET", jobIdKey, "parentKey", "parent")
local parentKey = parentReferences[1] or ""
local parentId = ""
local parentQueueKey = ""
if parentReferences[2] ~= false then
local jsonDecodedParent = cjson.decode(parentReferences[2])
parentId = jsonDecodedParent['id']
parentQueueKey = jsonDecodedParent['queueKey']
end

local jobId = ARGV[1]
local timestamp = ARGV[2]

Expand Down Expand Up @@ -188,7 +193,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])

-- If jobId is special ID 0:delay, then there is no job to process
if jobId then
if jobId then
if string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)
else
Expand Down
63 changes: 63 additions & 0 deletions tests/test_flow.ts
Expand Up @@ -893,6 +893,69 @@ describe('flows', () => {

await flow.close();
});

it('processes parent jobs added while a child job is active', async function () {
this.timeout(10_000);

const worker = new Worker(
queueName,
async () => {
await new Promise(s => {
setTimeout(s, 1_000);
});
},
{
connection,
},
);

const completing = new Promise<void>(resolve => {
worker.on('completed', (job: Job) => {
if (job.id === 'tue') {
resolve();
}
});
});

const flow = new FlowProducer({ connection });
await flow.add({
queueName,
name: 'mon',
opts: {
jobId: 'mon',
},
children: [],
});

await new Promise(s => {
setTimeout(s, 500);
});

const tree = await flow.add({
queueName,
name: 'tue',
opts: {
jobId: 'tue',
},
children: [
{
name: 'mon',
queueName,
opts: {
jobId: 'mon',
},
},
],
});

await completing;

const state = await tree.job.getState();

expect(state).to.be.equal('completed');

await flow.close();
});
});

describe('when custom prefix is set in flow producer', async () => {
Expand Down

0 comments on commit 6a6c0dc

Please sign in to comment.