diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 12b1d17336..512ab40ed2 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -292,8 +292,6 @@ export class Scripts { keepJobs, limiter: opts.limiter, lockDuration: opts.lockDuration, - parent: job.opts?.parent, - parentKey: job.parentKey, attempts: job.opts.attempts, attemptsMade: job.attemptsMade, maxMetricsSize: opts.metrics?.maxDataPoints diff --git a/src/commands/moveToFinished-12.lua b/src/commands/moveToFinished-12.lua index ed566308b2..50d324d5dd 100644 --- a/src/commands/moveToFinished-12.lua +++ b/src/commands/moveToFinished-12.lua @@ -35,8 +35,6 @@ opts - token - lock token opts - keepJobs opts - lockDuration - lock duration in milliseconds - opts - parent - parent data - opts - parentKey opts - attempts max attempts opts - attemptsMade opts - maxMetricsSize @@ -72,9 +70,6 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local opts = cmsgpack.unpack(ARGV[9]) local token = opts['token'] - local parentId = opts['parent'] and opts['parent']['id'] or "" - local parentQueueKey = opts['parent'] and opts['parent']['queue'] or "" - local parentKey = opts['parentKey'] or "" local attempts = opts['attempts'] local attemptsMade = opts['attemptsMade'] local maxMetricsSize = opts['maxMetricsSize'] @@ -91,7 +86,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if lockToken then -- Lock exists but token does not match return -6 - else + else -- Lock is missing completely return -2 end @@ -102,6 +97,16 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists return -4 end + local parentReferences = rcall("HMGET", jobIdKey, "parentKey", "parent") + local parentKey = parentReferences[1] or "" + local parentId = "" + local parentQueueKey = "" + if parentReferences[2] ~= false then + local jsonDecodedParent = cjson.decode(parentReferences[2]) + parentId = jsonDecodedParent['id'] + parentQueueKey = jsonDecodedParent['queueKey'] + end + local jobId = ARGV[1] local timestamp = ARGV[2] @@ -188,7 +193,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) -- If jobId is special ID 0:delay, then there is no job to process - if jobId then + if jobId then if string.sub(jobId, 1, 2) == "0:" then rcall("LREM", KEYS[2], 1, jobId) else diff --git a/tests/test_flow.ts b/tests/test_flow.ts index d9500f1f32..164caffca3 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -893,6 +893,69 @@ describe('flows', () => { await flow.close(); }); + + it('processes parent jobs added while a child job is active', async function () { + this.timeout(10_000); + + const worker = new Worker( + queueName, + async () => { + await new Promise(s => { + setTimeout(s, 1_000); + }); + }, + { + connection, + }, + ); + + const completing = new Promise(resolve => { + worker.on('completed', (job: Job) => { + if (job.id === 'tue') { + resolve(); + } + }); + }); + + const flow = new FlowProducer({ connection }); + await flow.add({ + queueName, + name: 'mon', + opts: { + jobId: 'mon', + }, + children: [], + }); + + await new Promise(s => { + setTimeout(s, 500); + }); + + const tree = await flow.add({ + queueName, + name: 'tue', + opts: { + jobId: 'tue', + }, + children: [ + { + name: 'mon', + queueName, + opts: { + jobId: 'mon', + }, + }, + ], + }); + + await completing; + + const state = await tree.job.getState(); + + expect(state).to.be.equal('completed'); + + await flow.close(); + }); }); describe('when custom prefix is set in flow producer', async () => {