diff --git a/docs/gitbook/.gitbook/assets/image (2) (1).png b/docs/gitbook/.gitbook/assets/image (2) (1).png new file mode 100644 index 0000000000..f95c89cbed Binary files /dev/null and b/docs/gitbook/.gitbook/assets/image (2) (1).png differ diff --git a/docs/gitbook/.gitbook/assets/image (2).png b/docs/gitbook/.gitbook/assets/image (2).png index f95c89cbed..7edf9c4098 100644 Binary files a/docs/gitbook/.gitbook/assets/image (2).png and b/docs/gitbook/.gitbook/assets/image (2).png differ diff --git a/docs/gitbook/.gitbook/assets/image (3).png b/docs/gitbook/.gitbook/assets/image (3).png new file mode 100644 index 0000000000..f99cb4797a Binary files /dev/null and b/docs/gitbook/.gitbook/assets/image (3).png differ diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index e9f492195f..1aaa22b10f 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -49,7 +49,9 @@ * [Introduction](bullmq-pro/introduction.md) * [Install](bullmq-pro/install.md) -* [Groups](bullmq-pro/groups.md) +* [Observables](bullmq-pro/observables.md) +* [Groups](bullmq-pro/groups/README.md) + * [Rate limiting](bullmq-pro/groups/rate-limiting.md) ## Bull diff --git a/docs/gitbook/api/bullmq.job.md b/docs/gitbook/api/bullmq.job.md index cc1c19a8ee..785b4c4136 100644 --- a/docs/gitbook/api/bullmq.job.md +++ b/docs/gitbook/api/bullmq.job.md @@ -27,6 +27,7 @@ export declare class Job(Optional) | | [name](./bullmq.job.name.md) | | NameType | The name of the Job | | [opts](./bullmq.job.opts.md) | | [JobsOptions](./bullmq.jobsoptions.md) | The options object for this job. | +| [parent?](./bullmq.job.parent.md) | | { id: string; queueKey: string; } | (Optional) | | [parentKey?](./bullmq.job.parentkey.md) | | string | (Optional) Fully qualified key (including the queue prefix) pointing to the parent of this job. | | [processedOn?](./bullmq.job.processedon.md) | | number | (Optional) Timestamp for when the job was processed. | | [progress](./bullmq.job.progress.md) | | number \| object | The progress a job has performed so far. | diff --git a/docs/gitbook/api/bullmq.job.movetocompleted.md b/docs/gitbook/api/bullmq.job.movetocompleted.md index 1990ea83de..89819a6206 100644 --- a/docs/gitbook/api/bullmq.job.movetocompleted.md +++ b/docs/gitbook/api/bullmq.job.movetocompleted.md @@ -9,7 +9,7 @@ Moves a job to the completed queue. Returned job to be used with Queue.prototype Signature: ```typescript -moveToCompleted(returnValue: ReturnType, token: string, fetchNext?: boolean): Promise<[JobJsonRaw, string] | []>; +moveToCompleted(returnValue: ReturnType, token: string, fetchNext?: boolean): Promise; ``` ## Parameters @@ -22,7 +22,7 @@ moveToCompleted(returnValue: ReturnType, token: string, fetchNext?: boolean): Pr Returns: -Promise<\[[JobJsonRaw](./bullmq.jobjsonraw.md), string\] \| \[\]> +Promise<[JobData](./bullmq.jobdata.md) \| \[\]> Returns the jobData of the next job in the waiting queue. diff --git a/docs/gitbook/api/bullmq.job.parent.md b/docs/gitbook/api/bullmq.job.parent.md new file mode 100644 index 0000000000..172733d130 --- /dev/null +++ b/docs/gitbook/api/bullmq.job.parent.md @@ -0,0 +1,14 @@ + + +[Home](./index.md) > [bullmq](./bullmq.md) > [Job](./bullmq.job.md) > [parent](./bullmq.job.parent.md) + +## Job.parent property + +Signature: + +```typescript +parent?: { + id: string; + queueKey: string; + }; +``` diff --git a/docs/gitbook/api/bullmq.jobdata.md b/docs/gitbook/api/bullmq.jobdata.md new file mode 100644 index 0000000000..8334355f8c --- /dev/null +++ b/docs/gitbook/api/bullmq.jobdata.md @@ -0,0 +1,13 @@ + + +[Home](./index.md) > [bullmq](./bullmq.md) > [JobData](./bullmq.jobdata.md) + +## JobData type + +Signature: + +```typescript +export declare type JobData = [JobJsonRaw | number, string?]; +``` +References: [JobJsonRaw](./bullmq.jobjsonraw.md) + diff --git a/docs/gitbook/api/bullmq.jobjsonraw.md b/docs/gitbook/api/bullmq.jobjsonraw.md index a394a1d837..7c3d7854b7 100644 --- a/docs/gitbook/api/bullmq.jobjsonraw.md +++ b/docs/gitbook/api/bullmq.jobjsonraw.md @@ -21,6 +21,7 @@ export interface JobJsonRaw | [id](./bullmq.jobjsonraw.id.md) | string | | | [name](./bullmq.jobjsonraw.name.md) | string | | | [opts](./bullmq.jobjsonraw.opts.md) | string | | +| [parent?](./bullmq.jobjsonraw.parent.md) | string | (Optional) | | [parentKey?](./bullmq.jobjsonraw.parentkey.md) | string | (Optional) | | [processedOn?](./bullmq.jobjsonraw.processedon.md) | string | (Optional) | | [progress](./bullmq.jobjsonraw.progress.md) | string | | diff --git a/docs/gitbook/api/bullmq.jobjsonraw.parent.md b/docs/gitbook/api/bullmq.jobjsonraw.parent.md new file mode 100644 index 0000000000..dfb94d6829 --- /dev/null +++ b/docs/gitbook/api/bullmq.jobjsonraw.parent.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [bullmq](./bullmq.md) > [JobJsonRaw](./bullmq.jobjsonraw.md) > [parent](./bullmq.jobjsonraw.parent.md) + +## JobJsonRaw.parent property + +Signature: + +```typescript +parent?: string; +``` diff --git a/docs/gitbook/api/bullmq.md b/docs/gitbook/api/bullmq.md index ad50899c94..3391ed9058 100644 --- a/docs/gitbook/api/bullmq.md +++ b/docs/gitbook/api/bullmq.md @@ -45,6 +45,7 @@ | [isRedisInstance(obj)](./bullmq.isredisinstance.md) | | | [jobIdForGroup(jobOpts, data, queueOpts)](./bullmq.jobidforgroup.md) | | | [lengthInUtf8Bytes(str)](./bullmq.lengthinutf8bytes.md) | Checks the size of string for ascii/non-ascii characters | +| [raw2jobData(raw)](./bullmq.raw2jobdata.md) | | | [removeAllQueueData(client, queueName, prefix)](./bullmq.removeallqueuedata.md) | | | [tryCatch(fn, ctx, args)](./bullmq.trycatch.md) | | @@ -109,6 +110,7 @@ | [EntryId](./bullmq.entryid.md) | | | [EntryRaw](./bullmq.entryraw.md) | | | [FlowQueuesOpts](./bullmq.flowqueuesopts.md) | | +| [JobData](./bullmq.jobdata.md) | | | [MinimalQueue](./bullmq.minimalqueue.md) | | | [ParentOpts](./bullmq.parentopts.md) | | | [Processor](./bullmq.processor.md) | An async function that receives Jobs and handles them. | diff --git a/docs/gitbook/api/bullmq.raw2jobdata.md b/docs/gitbook/api/bullmq.raw2jobdata.md new file mode 100644 index 0000000000..5b060d817a --- /dev/null +++ b/docs/gitbook/api/bullmq.raw2jobdata.md @@ -0,0 +1,22 @@ + + +[Home](./index.md) > [bullmq](./bullmq.md) > [raw2jobData](./bullmq.raw2jobdata.md) + +## raw2jobData() function + +Signature: + +```typescript +export declare function raw2jobData(raw: any[]): [JobJsonRaw | number, string?] | []; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| raw | any\[\] | | + +Returns: + +\[[JobJsonRaw](./bullmq.jobjsonraw.md) \| number, string?\] \| \[\] + diff --git a/docs/gitbook/api/bullmq.redisoptions.md b/docs/gitbook/api/bullmq.redisoptions.md index 3c1a3fc425..7b8575a72c 100644 --- a/docs/gitbook/api/bullmq.redisoptions.md +++ b/docs/gitbook/api/bullmq.redisoptions.md @@ -7,7 +7,7 @@ Signature: ```typescript -export declare type RedisOptions = BaseRedisOptions & { +export declare type RedisOptions = (BaseRedisOptions | ClusterOptions) & { skipVersionCheck?: boolean; }; ``` diff --git a/docs/gitbook/api/bullmq.scripts.md b/docs/gitbook/api/bullmq.scripts.md index b1528b6e0b..f1180f6997 100644 --- a/docs/gitbook/api/bullmq.scripts.md +++ b/docs/gitbook/api/bullmq.scripts.md @@ -31,7 +31,6 @@ export declare class Scripts | [moveToDelayed(queue, jobId, timestamp)](./bullmq.scripts.movetodelayed.md) | static | | | [moveToDelayedArgs(queue, jobId, timestamp)](./bullmq.scripts.movetodelayedargs.md) | static | | | [moveToFailedArgs(queue, job, failedReason, removeOnFailed, token, fetchNext)](./bullmq.scripts.movetofailedargs.md) | static | | -| [moveToFinished(queue, job, val, propVal, shouldRemove, target, token, fetchNext)](./bullmq.scripts.movetofinished.md) | static | | | [moveToFinishedArgs(queue, job, val, propVal, shouldRemove, target, token, fetchNext)](./bullmq.scripts.movetofinishedargs.md) | static | | | [moveToWaitingChildren(queue, jobId, token, opts)](./bullmq.scripts.movetowaitingchildren.md) | static | Move parent job to waiting-children state. | | [moveToWaitingChildrenArgs(queue, jobId, token, opts)](./bullmq.scripts.movetowaitingchildrenargs.md) | static | | diff --git a/docs/gitbook/api/bullmq.scripts.movetoactive.md b/docs/gitbook/api/bullmq.scripts.movetoactive.md index c0c60cbf1c..5b6e57d4ac 100644 --- a/docs/gitbook/api/bullmq.scripts.movetoactive.md +++ b/docs/gitbook/api/bullmq.scripts.movetoactive.md @@ -7,7 +7,7 @@ Signature: ```typescript -static moveToActive(worker: Worker, token: string, jobId?: string): Promise<[] | [number, undefined] | [JobJsonRaw, string]>; +static moveToActive(worker: Worker, token: string, jobId?: string): Promise<[] | [number | JobJsonRaw, string?]>; ``` ## Parameters @@ -20,5 +20,5 @@ static moveToActive(worker: Worker, token: stri Returns: -Promise<\[\] \| \[number, undefined\] \| \[[JobJsonRaw](./bullmq.jobjsonraw.md), string\]> +Promise<\[\] \| \[number \| [JobJsonRaw](./bullmq.jobjsonraw.md), string?\]> diff --git a/docs/gitbook/api/bullmq.scripts.movetocompleted.md b/docs/gitbook/api/bullmq.scripts.movetocompleted.md index 44f6e9d10d..55fc399f43 100644 --- a/docs/gitbook/api/bullmq.scripts.movetocompleted.md +++ b/docs/gitbook/api/bullmq.scripts.movetocompleted.md @@ -7,7 +7,7 @@ Signature: ```typescript -static moveToCompleted(queue: MinimalQueue, job: Job, returnvalue: any, removeOnComplete: boolean | number, token: string, fetchNext: boolean): Promise<[JobJsonRaw, string] | []>; +static moveToCompleted(queue: MinimalQueue, job: Job, returnvalue: any, removeOnComplete: boolean | number, token: string, fetchNext: boolean): Promise; ``` ## Parameters @@ -23,5 +23,5 @@ static moveToCompleted(queue: MinimalQueue, job: Job, returnvalue: any, removeOn Returns: -Promise<\[[JobJsonRaw](./bullmq.jobjsonraw.md), string\] \| \[\]> +Promise<[JobData](./bullmq.jobdata.md) \| \[\]> diff --git a/docs/gitbook/api/bullmq.scripts.movetofinished.md b/docs/gitbook/api/bullmq.scripts.movetofinished.md deleted file mode 100644 index 88a855ff95..0000000000 --- a/docs/gitbook/api/bullmq.scripts.movetofinished.md +++ /dev/null @@ -1,29 +0,0 @@ - - -[Home](./index.md) > [bullmq](./bullmq.md) > [Scripts](./bullmq.scripts.md) > [moveToFinished](./bullmq.scripts.movetofinished.md) - -## Scripts.moveToFinished() method - -Signature: - -```typescript -static moveToFinished(queue: MinimalQueue, job: Job, val: any, propVal: string, shouldRemove: boolean | number, target: string, token: string, fetchNext: boolean): Promise<[JobJsonRaw, string] | []>; -``` - -## Parameters - -| Parameter | Type | Description | -| --- | --- | --- | -| queue | [MinimalQueue](./bullmq.minimalqueue.md) | | -| job | [Job](./bullmq.job.md) | | -| val | any | | -| propVal | string | | -| shouldRemove | boolean \| number | | -| target | string | | -| token | string | | -| fetchNext | boolean | | - -Returns: - -Promise<\[[JobJsonRaw](./bullmq.jobjsonraw.md), string\] \| \[\]> - diff --git a/docs/gitbook/api/bullmq.worker_2.createjob.md b/docs/gitbook/api/bullmq.worker_2.createjob.md new file mode 100644 index 0000000000..a0d8961d6c --- /dev/null +++ b/docs/gitbook/api/bullmq.worker_2.createjob.md @@ -0,0 +1,23 @@ + + +[Home](./index.md) > [bullmq](./bullmq.md) > [Worker\_2](./bullmq.worker_2.md) > [createJob](./bullmq.worker_2.createjob.md) + +## Worker\_2.createJob() method + +Signature: + +```typescript +protected createJob(data: JobJsonRaw, jobId: string): Job; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| data | [JobJsonRaw](./bullmq.jobjsonraw.md) | | +| jobId | string | | + +Returns: + +[Job](./bullmq.job.md)<any, any, string> + diff --git a/docs/gitbook/api/bullmq.worker_2.md b/docs/gitbook/api/bullmq.worker_2.md index 1b33631221..059293a4c8 100644 --- a/docs/gitbook/api/bullmq.worker_2.md +++ b/docs/gitbook/api/bullmq.worker_2.md @@ -37,6 +37,7 @@ export declare class WorkerThis method waits for current jobs to finalize before returning. | +| [createJob(data, jobId)](./bullmq.worker_2.createjob.md) | | | | [delay()](./bullmq.worker_2.delay.md) | | This function is exposed only for testing purposes. | | [getNextJob(token, { block })](./bullmq.worker_2.getnextjob.md) | | Returns a promise that resolves to the next job in queue. | | [isPaused()](./bullmq.worker_2.ispaused.md) | | Checks if worker is paused. | diff --git a/docs/gitbook/bullmq-pro/groups.md b/docs/gitbook/bullmq-pro/groups/README.md similarity index 86% rename from docs/gitbook/bullmq-pro/groups.md rename to docs/gitbook/bullmq-pro/groups/README.md index fd254fbafe..32adb2032b 100644 --- a/docs/gitbook/bullmq-pro/groups.md +++ b/docs/gitbook/bullmq-pro/groups/README.md @@ -4,9 +4,9 @@ Groups allows you to use only one queue yet distribute the jobs among groups so For example, imagine that you have 1 queue for processing video transcoding for all your users, you may have thousands of users in your application. You need to offload the transcoding operation since it is lengthy and CPU consuming. If you have many users that want to transcode many files, then in a non-grouped queue one user could fill the queue with jobs and the rest of the users will need to wait for that user to complete all its jobs before their jobs get processed. -Groups resolves this problem since jobs will be processed in a "[round-robin](https://en.wikipedia.org/wiki/Round-robin\_item\_allocation)" fashion among all the users. +Groups resolves this problem since jobs will be processed in a "[round-robin](https://en.wikipedia.org/wiki/Round-robin\_item\_allocation)" fashion among all the users. -![](<../.gitbook/assets/image (1).png>) +![](<../../.gitbook/assets/image (1).png>) If you have several workers or a concurrency factor larger than one, jobs will be processed in parallel, but they will be picked up from the groups as mentioned before following a round-robin ordering. @@ -23,32 +23,39 @@ Another way to see groups is like "virtual" queues. So instead of having one que In order to use the group functionality just use the group property in the job options when adding a job: ```typescript - import { QueuePro } from '@taskforcesh/bullmq-pro' - - const queue = new QueuePro(); +import { QueuePro } from '@taskforcesh/bullmq-pro'; - const job1 = await queue.add('test', { foo: 'bar1' }, { +const queue = new QueuePro(); + +const job1 = await queue.add( + 'test', + { foo: 'bar1' }, + { group: { id: 1, }, - }); - - const job2 = await queue.add('test', { foo: 'bar2' }, { + }, +); + +const job2 = await queue.add( + 'test', + { foo: 'bar2' }, + { group: { id: 2, }, - }); - + }, +); ``` In order to process the jobs, just use a pro worker as you normally do with standard workers: ```typescript -import { WorkerPro } from '@taskforcesh/bullmq-pro' +import { WorkerPro } from '@taskforcesh/bullmq-pro'; const worker = new WorkerPro('test', async job => { // Do something usefull. - + // You can also do something different depending on the group await doSomethingSpecialForMyGroup(job.opts.group); }); diff --git a/docs/gitbook/bullmq-pro/groups/rate-limiting.md b/docs/gitbook/bullmq-pro/groups/rate-limiting.md new file mode 100644 index 0000000000..a333bf6866 --- /dev/null +++ b/docs/gitbook/bullmq-pro/groups/rate-limiting.md @@ -0,0 +1,28 @@ +# Rate limiting + +A useful feature when using groups is to be able to rate limit the groups independently of each other, so you can evenly process the jobs belonging to many groups and still limit how many jobs per group are allowed to be processed by unit of time. + +The way the rate limiting works is that when the jobs for a given group exceed the maximum amount of jobs per unit of time that particular group gets rate limited. The jobs that belongs to this particular group will not be processed until the rate limit expires. + +For example "group 2" is rate limited in the following chart: + +![Rate limited group](<../../.gitbook/assets/image (3).png>) + +While one or more groups are rate limited, the rest of the jobs belonging to non rate limited groups will continue to be consumed normally or until they also get rate limited. + +The rate limit is configured on the worker instances: + +```typescript +import { WorkerPro } from '@taskforcesh/bullmq-pro'; + +const worker = new WorkerPro('myQueue', processFn, { + groups: { + limit: { + max: 100, // Limit to 100 jobs per second per group + duration 1000, + } + }, + connection +}); +``` + diff --git a/docs/gitbook/bullmq-pro/introduction.md b/docs/gitbook/bullmq-pro/introduction.md index 005aa2534a..58a20053cf 100644 --- a/docs/gitbook/bullmq-pro/introduction.md +++ b/docs/gitbook/bullmq-pro/introduction.md @@ -12,4 +12,4 @@ There is a growing number of features that will be implemented in BullMQ Pro, yo BullMQ Pro is licensed per organization with unlimited use on any projects. The current price is 95$/mo or 950$/year. You can try it for free by generating a new token on the "BullMQ Pro" tab in your account: -![](<../.gitbook/assets/image (2).png>) +![](<../.gitbook/assets/image (2) (1).png>) diff --git a/docs/gitbook/bullmq-pro/observables.md b/docs/gitbook/bullmq-pro/observables.md new file mode 100644 index 0000000000..20caad226e --- /dev/null +++ b/docs/gitbook/bullmq-pro/observables.md @@ -0,0 +1,8 @@ +# Observables + +Instead of returning regular promises, you can also return an Observable, this allows for some more advanced uses cases: + +* It makes possible to cleanly cancel a running job. +* You can define a "Time to live" (TTL) so that jobs that take too long time will be automatically cancelled. +* Since the last value returned by the observable is persisted, you could retry a job and continue where you left of, for example if the job implements a state machine or similar. + diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 1fd6520508..6e3ba8f2d2 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,37 @@ +## [1.57.4](https://github.com/taskforcesh/bullmq/compare/v1.57.3...v1.57.4) (2021-12-14) + + +### Bug Fixes + +* **move-to-active:** add try catch in moveToActive call ([#933](https://github.com/taskforcesh/bullmq/issues/933)) ([bab45b0](https://github.com/taskforcesh/bullmq/commit/bab45b05d08c625557e2df65921e12f48081d39c)) +* **redis-connection:** consider cluster redisOptions config ([#934](https://github.com/taskforcesh/bullmq/issues/934)) ([5130f63](https://github.com/taskforcesh/bullmq/commit/5130f63ad969efa9649ab8f9abf36a72e8f553f4)) + +## [1.57.3](https://github.com/taskforcesh/bullmq/compare/v1.57.2...v1.57.3) (2021-12-14) + + +### Bug Fixes + +* remove debug console.error ([#932](https://github.com/taskforcesh/bullmq/issues/932)) ([271aac3](https://github.com/taskforcesh/bullmq/commit/271aac3417bc7f76ac02435b456552677b2847db)) + +## [1.57.2](https://github.com/taskforcesh/bullmq/compare/v1.57.1...v1.57.2) (2021-12-11) + + +### Bug Fixes + +* **connection:** check instance options to console log deprecation message ([#927](https://github.com/taskforcesh/bullmq/issues/927)) ([fc1e2b9](https://github.com/taskforcesh/bullmq/commit/fc1e2b9f3f20db53f9dc7ecdfa4644f02acc9f83)) + + +### Performance Improvements + +* **add-job:** save parent data as json ([#859](https://github.com/taskforcesh/bullmq/issues/859)) ([556d4ee](https://github.com/taskforcesh/bullmq/commit/556d4ee427090f60270945a7fd438e2595bb43e9)) + +## [1.57.1](https://github.com/taskforcesh/bullmq/compare/v1.57.0...v1.57.1) (2021-12-11) + + +### Bug Fixes + +* **worker:** better handling of block timeout ([be4c933](https://github.com/taskforcesh/bullmq/commit/be4c933ae0a7a790d24a081b2ed4e7e1c0216e47)) + # [1.57.0](https://github.com/taskforcesh/bullmq/compare/v1.56.0...v1.57.0) (2021-12-08) @@ -37,7 +71,7 @@ ### Bug Fixes -* **stalled:** save finidhedOn when job stalled more than allowable limit ([#900](https://github.com/taskforcesh/bullmq/issues/900)) ([eb89edf](https://github.com/taskforcesh/bullmq/commit/eb89edf2f4eb85dedb1485de32e79331940a654f)) +* **stalled:** save finishedOn when job stalled more than allowable limit ([#900](https://github.com/taskforcesh/bullmq/issues/900)) ([eb89edf](https://github.com/taskforcesh/bullmq/commit/eb89edf2f4eb85dedb1485de32e79331940a654f)) ## [1.54.5](https://github.com/taskforcesh/bullmq/compare/v1.54.4...v1.54.5) (2021-11-26) diff --git a/package.json b/package.json index 2b15f02f87..00d2976e4e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "1.57.0", + "version": "1.57.4", "description": "Queue for messages and jobs based on Redis", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -45,7 +45,7 @@ "cron-parser": "^2.18.0", "get-port": "^5.1.1", "glob": "^7.2.0", - "ioredis": "^4.27.9", + "ioredis": "^4.28.2", "lodash": "^4.17.21", "msgpackr": "^1.4.6", "semver": "^6.3.0", @@ -68,7 +68,7 @@ "@types/chai": "^4.2.22", "@types/chai-as-promised": "^7.1.4", "@types/glob": "^7.2.0", - "@types/ioredis": "^4.27.4", + "@types/ioredis": "^4.28.2", "@types/lodash": "^4.14.173", "@types/mocha": "^5.2.7", "@types/msgpack": "^0.0.31", diff --git a/src/classes/job.ts b/src/classes/job.ts index c5ccf57999..0407d3a00b 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -15,7 +15,7 @@ import { } from '../utils'; import { QueueEvents } from './queue-events'; import { Backoffs } from './backoffs'; -import { MinimalQueue, ParentOpts, Scripts } from './scripts'; +import { MinimalQueue, ParentOpts, Scripts, JobData } from './scripts'; import { fromPairs } from 'lodash'; const logger = debuglog('bull'); @@ -52,6 +52,7 @@ export interface JobJsonRaw { stacktrace: string[]; returnvalue: string; parentKey?: string; + parent?: string; } export interface MoveToChildrenOpts { @@ -126,6 +127,7 @@ export class Job< * Fully qualified key (including the queue prefix) pointing to the parent of this job. */ parentKey?: string; + parent?: { id: string; queueKey: string }; protected toKey: (type: string) => string; @@ -163,6 +165,10 @@ export class Job< this.parentKey = getParentKey(opts.parent); + this.parent = opts.parent + ? { id: opts.parent.id, queueKey: opts.parent.queue } + : undefined; + this.toKey = queue.toKey.bind(queue); } @@ -281,6 +287,10 @@ export class Job< job.parentKey = json.parentKey; } + if (json.parent) { + job.parent = JSON.parse(json.parent); + } + return job; } @@ -406,7 +416,7 @@ export class Job< returnValue: ReturnType, token: string, fetchNext = true, - ): Promise<[JobJsonRaw, string] | []> { + ): Promise { await this.queue.waitUntilReady(); this.returnvalue = returnValue || void 0; diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 2615cdc13f..00c947d722 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -48,9 +48,13 @@ export class RedisConnection extends EventEmitter { }; } else { this._client = opts; + let options = this._client.options; + if((options)?.redisOptions){ + options = (options).redisOptions; + } + if ( - (this._client.options).maxRetriesPerRequest || - this._client.options.enableReadyCheck + ((options).maxRetriesPerRequest || options.enableReadyCheck) ) { throw new Error(connectionErrorMessage); } @@ -64,8 +68,11 @@ export class RedisConnection extends EventEmitter { this.initializing.catch(err => this.emit('error', err)); } - private checkOptions(msg: string, options?: RedisOptions) { - if (options && (options.maxRetriesPerRequest || options.enableReadyCheck)) { + private checkOptions(msg: string, options?: IORedis.RedisOptions) { + if ( + options && + (options.maxRetriesPerRequest || options.enableReadyCheck) + ) { console.error(msg); } } diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index f327f760e2..175be21724 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -48,6 +48,8 @@ export type ParentOpts = { parentKey?: string; }; +export type JobData = [JobJsonRaw | number, string?]; + export class Scripts { static async isJobInList( queue: MinimalQueue, @@ -237,7 +239,7 @@ export class Scripts { return keys.concat(args); } - static async moveToFinished( + private static async moveToFinished( queue: MinimalQueue, job: Job, val: any, @@ -246,7 +248,7 @@ export class Scripts { target: string, token: string, fetchNext: boolean, - ): Promise<[JobJsonRaw, string] | []> { + ): Promise { const client = await queue.client; const args = this.moveToFinishedArgs( queue, @@ -318,7 +320,7 @@ export class Scripts { removeOnComplete: boolean | number, token: string, fetchNext: boolean, - ): Promise<[JobJsonRaw, string] | []> { + ): Promise { return this.moveToFinished( queue, job, @@ -780,7 +782,10 @@ export class Scripts { */ } -function raw2jobData(raw: any[]): [JobJsonRaw, string] | [] { +export function raw2jobData(raw: any[]): [JobJsonRaw | number, string?] | [] { + if (typeof raw === 'number') { + return [raw, void 0] as [number, undefined]; + } if (raw) { const jobData = raw[0]; if (jobData.length) { diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 1e4849ff15..f895500c77 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -111,6 +111,8 @@ export class Worker< private drained: boolean; private waiting = false; private running = false; + private blockTimeout = 0; + protected processFn: Processor; private resumeWorker: () => void; @@ -196,6 +198,10 @@ export class Worker< return this.processFn(job, token); } + protected createJob(data: JobJsonRaw, jobId: string) { + return Job.fromJSON(this, data, jobId); + } + /** * * Waits until the worker is ready to start processing jobs. @@ -348,8 +354,14 @@ export class Worker< } protected async moveToActive(token: string, jobId?: string) { - const [jobData, id] = await Scripts.moveToActive(this, token, jobId); - return this.nextJobFromJobData(jobData, id); + try { + const [jobData, id] = await Scripts.moveToActive(this, token, jobId); + return await this.nextJobFromJobData(jobData, id); + } catch (error) { + if (isNotConnectionError(error)) { + this.emit('error', error); + } + } } private async waitForJob() { @@ -364,10 +376,16 @@ export class Worker< try { this.waiting = true; + + const blockTimeout = Math.max( + this.blockTimeout ? this.blockTimeout / 1000 : opts.drainDelay, + 0.01, + ); + jobId = await client.brpoplpush( this.keys.wait, this.keys.active, - opts.drainDelay, + blockTimeout, ); } catch (error) { if (isNotConnectionError(error)) { @@ -392,31 +410,33 @@ export class Worker< jobData?: JobJsonRaw | number, jobId?: string, ): Promise> { - if (jobData) { - this.drained = false; + // NOTE: This is not really optimal in all cases since a new job would could arrive at the wait + // list and this worker will not start processing it directly. + // Best would be to emit drain and block for rateKeyExpirationTime + if (typeof jobData === 'number') { + if (!this.drained) { + this.emit('drained'); + this.drained = true; + } - // - // Check if the queue is rate limited. jobData will be the amount - // of rate limited jobs. - // - - // NOTE: This is not really optimal in all cases since a new job would could arrive at the wait - // list and this worker will not start processing it directly. - // Best would be to emit drain and block for rateKeyExpirationTime - if (typeof jobData === 'number') { - if (this.opts.limiter.workerDelay) { - const rateKeyExpirationTime = jobData; - await delay(rateKeyExpirationTime); - } + // workerDelay left for backwards compatibility although not recommended to use. + if (this.opts?.limiter?.workerDelay) { + const rateKeyExpirationTime = jobData; + await delay(rateKeyExpirationTime); } else { - const job = Job.fromJSON(this, jobData, jobId); - if (job.opts.repeat) { - const repeat = await this.repeat; - await repeat.addNextRepeatableJob(job.name, job.data, job.opts); - } - return job; + this.blockTimeout = jobData; + } + } else if (jobData) { + this.drained = false; + const job = this.createJob(jobData, jobId); + if (job.opts.repeat) { + const repeat = await this.repeat; + await repeat.addNextRepeatableJob(job.name, job.data, job.opts); } + return job; } else if (!this.drained) { + this.blockTimeout = 0; + this.emit('drained'); this.drained = true; } @@ -483,7 +503,7 @@ export class Worker< const handleFailed = async (err: Error) => { try { - await job.moveToFailed(err, token); + const failed = await job.moveToFailed(err, token); this.emit('failed', job, err, 'active'); } catch (err) { this.emit('error', err); diff --git a/src/commands/addJob-9.lua b/src/commands/addJob-9.lua index 748ccf747c..34f05d37eb 100644 --- a/src/commands/addJob-9.lua +++ b/src/commands/addJob-9.lua @@ -51,18 +51,30 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] +local parentId +local parentQueueKey +local parentData + +-- Includes +--- @include "includes/destructureJobKey" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end + + parentId = getJobIdFromKey(parentKey) + parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId) + local parent = {} + parent['id'] = parentId + parent['queueKey'] = parentQueueKey + parentData = cjson.encode(parent) end local jobCounter = rcall("INCR", KEYS[4]) -- Includes --- @include "includes/updateParentDepsIfNeeded" ---- @include "includes/destructureJobKey" local parentDependenciesKey = args[7] if args[2] == "" then @@ -75,15 +87,13 @@ else if parentKey ~= nil then if rcall("ZSCORE", KEYS[7], jobId) ~= false then local returnvalue = rcall("HGET", jobIdKey, "returnvalue") - local parentId = getJobIdFromKey(parentKey) - local parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId) updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey, parentId, jobIdKey, returnvalue) else if parentDependenciesKey ~= nil then rcall("SADD", parentDependenciesKey, jobIdKey) end end - rcall("HMSET", jobIdKey, "parentKey", parentKey) + rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData) end return jobId .. "" -- convert to string end @@ -96,10 +106,10 @@ local priority = opts['priority'] or 0 local timestamp = args[4] if parentKey ~= nil then - rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts, - "timestamp", timestamp, "delay", delay, "priority", priority, "parentKey", parentKey) + rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts, + "timestamp", timestamp, "delay", delay, "priority", priority, "parentKey", parentKey, "parent", parentData) else - rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts, + rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts, "timestamp", timestamp, "delay", delay, "priority", priority ) end diff --git a/src/interfaces/redis-options.ts b/src/interfaces/redis-options.ts index 6e2d1fc92c..bec30d2cec 100644 --- a/src/interfaces/redis-options.ts +++ b/src/interfaces/redis-options.ts @@ -1,6 +1,6 @@ -import { Redis, RedisOptions as BaseRedisOptions, Cluster } from 'ioredis'; +import { Redis, RedisOptions as BaseRedisOptions, ClusterOptions, Cluster } from 'ioredis'; -export type RedisOptions = BaseRedisOptions & { +export type RedisOptions = (BaseRedisOptions | ClusterOptions) & { skipVersionCheck?: boolean; }; diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 536e2cac1c..3c3a6ea5a6 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -112,6 +112,10 @@ describe('flows', () => { expect(children[0].job.id).to.be.ok; expect(children[0].job.data.foo).to.be.eql('bar'); + expect(children[0].job.parent).to.deep.equal({ + id: job.id, + queueKey: `bull:${parentQueueName}`, + }); expect(children[1].job.id).to.be.ok; expect(children[1].job.data.foo).to.be.eql('baz'); expect(children[2].job.id).to.be.ok; @@ -1570,6 +1574,15 @@ describe('flows', () => { 'waiting', ); + for (let i = 0; i < tree.children.length; i++) { + const child = tree.children[i]; + const childJob = await Job.fromId(queue, child.job.id); + expect(childJob.parent).to.deep.equal({ + id: tree.job.id, + queueKey: `bull:${parentQueueName}`, + }); + } + await tree.job.remove(); const parentQueue = new Queue(parentQueueName, { connection }); diff --git a/yarn.lock b/yarn.lock index 60baf98197..8101435242 100644 --- a/yarn.lock +++ b/yarn.lock @@ -798,10 +798,10 @@ "@types/minimatch" "*" "@types/node" "*" -"@types/ioredis@^4.27.4": - version "4.27.4" - resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.27.4.tgz#2caf9d0222640c9d7ce278fca2f9892c1323f9c8" - integrity sha512-uTAA/woL//GxXQI1e9FuUoDZCpP8yn5LXQdea1IEFyLtb8GP2w3HfOE+SqglF6QSAp/3cZLWzrMhHqWSYI3bfg== +"@types/ioredis@^4.28.2": + version "4.28.2" + resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.2.tgz#7ff68944cbb58afefb2a1d8b250ebf48760e93c9" + integrity sha512-kOCG4ExodOAoslFjsUzwAK1fxqVRx3JX5m7lm3MkWf5WkDwNTtXBtlBig6cQyFwS7P7TPaTWRreQaMwl7e1FFA== dependencies: "@types/node" "*" @@ -3354,10 +3354,10 @@ into-stream@^6.0.0: from2 "^2.3.0" p-is-promise "^3.0.0" -ioredis@^4.27.9: - version "4.27.9" - resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.27.9.tgz#c27bbade9724f0b8f84c279fb1d567be785ba33d" - integrity sha512-hAwrx9F+OQ0uIvaJefuS3UTqW+ByOLyLIV+j0EH8ClNVxvFyH9Vmb08hCL4yje6mDYT5zMquShhypkd50RRzkg== +ioredis@^4.28.2: + version "4.28.2" + resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.28.2.tgz#493ccd5d869fd0ec86c96498192718171f6c9203" + integrity sha512-kQ+Iv7+c6HsDdPP2XUHaMv8DhnSeAeKEwMbaoqsXYbO+03dItXt7+5jGQDRyjdRUV2rFJbzg7P4Qt1iX2tqkOg== dependencies: cluster-key-slot "^1.1.0" debug "^4.3.1"