Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to expire jobs #301

Open
gajus opened this issue Oct 27, 2020 · 14 comments
Open

Add a way to expire jobs #301

gajus opened this issue Oct 27, 2020 · 14 comments
Labels
enhancement New feature or request

Comments

@gajus
Copy link

gajus commented Oct 27, 2020

If a job is not accepted within X seconds, I would like to expire it.

I've had cases where a queue of job would build up with thousands of pending tasks.

Whenever Worker service started it would crash soon not able to keep up with the pressure.

This will result in more tasks, etc.

This could be at least partially mitigated by indicating a time after which job should no longer be attempted.

@acro5piano
Copy link

acro5piano commented Apr 7, 2021

How about setting jobId and discard by the ID?

    const queue = new Queue('my-queue')

    const jobId = uuid.v4()
    await queue.add('my-job', { jobId })

    // Later...
    const job = await queue.getJob(jobId)
    job.discard()

@manast manast added the enhancement New feature or request label Apr 8, 2021
@manast
Copy link
Contributor

manast commented Apr 8, 2021

This would actually be relative easy to implement by having a TTL in the wait list, if when the job reaches the tip of the queue the TTL has passed we can just discard the job. However I am not sure this feature is very demanded so I will give it low priority or if anybody wants they can submit a PR.

@manast
Copy link
Contributor

manast commented Apr 8, 2021

Note: when discarding the job probably we want to actually move it to fail so that we can keep accounting of the discarded jobs.

@WarmVissarutRonaldoDude
Copy link

it seem to be a useful use-case, for example if the job's stale and no worker processed it for awhile (ex. set TTL for 24 hours) then queue scheduler can move those jobs to failed and then it's will react base on removeOnFailed ("want to keep or delete" depend on those config) would be good.

@Smillence
Copy link

expanding on @manast suggestions above, if the TTL is a constant for all the jobs, you don't need to add the data to all the jobs. You can simply store it in a const inside the worker processor function and use the Job.timestamp property to get the time when the job is added to the queue and compare with Date.now().

@runningman84
Copy link

From my point of view redis itself supports a ttl for items. If this feature would be used we could make sure that redis itself would purge old items.

In my usecase we queue a stream of metrics data for further processing, if the processing is stalled the redis database gets filled up until you get out of memory errors. But old metrics data are not important for us anyway, this means if we could add a ttl to all entries the database could never run out of memory (yes we would miss older metrics, but this is okay for our usecase).

@manast
Copy link
Contributor

manast commented Jul 7, 2022

@runningman84 BullMQ supports limiting the amount of jobs kept after completing or failing, is this not enough for your use case?

@runningman84
Copy link

The thing is we want to limit the amount of active/waiting jobs... the job queue should not be longer than x messages and messages older than x minutes should be expired without even processing them...

@manast
Copy link
Contributor

manast commented Jul 10, 2022

We do not have a function like that, however we have discussed in the past the possibility that a worker discards jobs that are over a certain age.

@manast
Copy link
Contributor

manast commented Jul 10, 2022

In practice it would have the same effect as what you need if I am not mistaken.

@Rush
Copy link

Rush commented Oct 4, 2022

I was looking for such an option. In normal system operation it's probably not that needed but I had some cases in development when workers were broken and some code accumulated a ton of jobs that should be simply scrapped.

@antoniojtorres
Copy link

antoniojtorres commented Feb 1, 2023

Thanks for the great package manast and team.

I can add my perspective as this feature would be useful for the project I'm working on. Our scenario one where we generate a daily list of tasks to perform, the list is ordered by priority with the most important things at the top, and goes down to things that are not mandatory, but that we can perform if the mandatory tasks finish early, and would be nice to collect the result such tasks.

However, at recurring intervals, the mandatory tasks will get added back in and push the low priority stuff back. This can apply to any scenario where it just isn't worth to even process a job after x amount of time and it's worthwhile to free the worker for something of higher priority.

Having a waiting TTL would provide a straight forward way to ensure that these stale jobs don't slowly balloon in size.

The suggestion in bull's issue 2522 by manast is workable, but would cause gaps in processing if there are thousands or more items that become stale.

My question, is it possible to leverage an internal redis process to allow certain items to be expired out without performing a drain as suggested above?

Thanks again.

@manast
Copy link
Contributor

manast commented Feb 3, 2023

@antoniojtorres

The suggestion in bull's issue 2522 by manast is workable, but would cause gaps in processing if there are thousands or more items that become stale.

I do not understand how it will cause gaps 🤔 as discarded jobs will be discarded very quickly and then the non-discarded ones will be processed normally. This is lazy vs proactive removal of old jobs. Even Redis expire command works this way unless you have a maxmemory policy that evicts keys when memory is above a certain threshold.

The only alternative I can think of is to have a separate process that prunes old jobs, it will need to examine the wait list, one job at a time, and determine if it has exceeded a given time. But this is more complex and in practice, is more resource intensive as you need to examine the jobs regularly instead of just once when the job arrives at the worker. The only advantage of such a method that I can think of would be that it could potentially free memory sooner...

@jtsmedley
Copy link

+1 for support. In my use case jobs that aren't picked up within X time have already been processed by a recurring worker in another daemon so dropping out of the key store is optimal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

9 participants