Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Unique queues (prevent duplicated / the same jobs twice in the queue) #2151

Closed
royduin opened this issue Mar 25, 2020 · 33 comments
Closed

Unique queues (prevent duplicated / the same jobs twice in the queue) #2151

royduin opened this issue Mar 25, 2020 · 33 comments

Comments

@royduin
Copy link

royduin commented Mar 25, 2020

It would be nice to have unique queues baked into the framework by default so when pushing the same job twice only 1 makes it into the queue.

References:

@mfn
Copy link

mfn commented Mar 26, 2020

Nice list!

I like the implementation of https://github.com/mlntn/laravel-unique-queue but unless I missed it, it requires you to implement this for all queues and you can't selectively do this on a "per job" basis? But maybe I'm wrong. However it's strictly a "redis" feature though OTOH it supports horizon (which afaik only supports redis either).

https://github.com/mingalevme/illuminate-uqueue seems complicated, supports DBs but doesn't mention Horizon (but maybe works transparently).

https://github.com/mbm-rafal/laravel-single-dispatch is something else, it's only in-memory dispatching to prevent double release of events. Also this lib clashes if you already want/need to use another dispatcher (which I'm doing, e.g. https://github.com/fntneves/laravel-transactional-events

@elfeffe
Copy link

elfeffe commented Jun 7, 2020

This is really needed, I have also implemented my own unique I’d for my database driver. The list is great, but I think it should me added to the core.

@atapatel
Copy link

atapatel commented Oct 7, 2020

Following Thread.

@royduin
Copy link
Author

royduin commented Oct 7, 2020

@themsaid? You're the queue guru?

An example; let's say we have products and a product updated event which triggers an index to for example Algolia. When the queue is busy with 1000 jobs and we update a product 10 times. 10 jobs will be added to the queue. When the queue is finally there after processing the 1000 jobs; 10 times the same thing is executed.

@paras-malhotra
Copy link

paras-malhotra commented Oct 19, 2020

Implemented something similar in laravel/framework#34794.

EDIT: It isn't exactly the same use case though. Instead of "pushing unique", it just "processes unique", meaning if there's an overlapping job, you can get rid of it with the dontRelease method.

@royduin
Copy link
Author

royduin commented Oct 19, 2020

Great work @paras-malhotra! But prevent jobs to overlap isn't the same als uniques right? With your middleware the jobs are still added to the queue and are getting executed.

@elfeffe
Copy link

elfeffe commented Oct 19, 2020

Great, I will check it later

@paras-malhotra
Copy link

But prevent jobs to overlap isn't the same als uniques right? With your middleware the jobs are still added to the queue and are getting executed.

@royduin, yeah the jobs are still added to the queue but you have the option to delete jobs if duplicates are being processed at the same time.

@royduin
Copy link
Author

royduin commented Oct 19, 2020

So let's take my example from earlier and we've 2 queue workers running, in that case when they're getting processed with your middleware the first one is executed, the second, third, fourth, etc are not getting executed by the second worker until the first one is finished. So it's possible that the first job is executed and the 7th for example. But just the first one is enough.

I think it's better to check if there is a job in the queue before adding another one to get real uniqueness.

@paras-malhotra
Copy link

paras-malhotra commented Oct 19, 2020

Yes, the subsequent jobs won't be executed until the first one is finished. If you use dontRelease, then the subsequent jobs would be deleted if picked up while the first one is executing.

The packages that you mentioned don't really avoid duplicate processing of jobs. This is best done with database queries.

Let's take https://github.com/mingalevme/illuminate-uqueue for example. This does a zadd nx operation to only push it to the queue if the job doesn't already exist on the queue. There are several problems with this approach:

  1. If the job has already finished processing and a second job is pushed after the first is complete, the second job will be processed even if it's a duplicate
  2. Comparison of jobs is based on the payload. If the first job timed out, then it will be pushed back onto the queue for processing. Let's say the 2nd job is then pushed on to the queue. Both are duplicates but the payload differs (because the attempt count is different), so they both will be pushed into and processed from the "unique" queue

If you truly need to avoid processing duplicate jobs, database queries are probably the best way to do so. Just mark a column as processed and before processing check if already not processed. This approach does not suffer any of the problems above.

TL;DR I don't recommend any of the packages listed above - the approach is faulty. A simple DB query will do the job!

@royduin
Copy link
Author

royduin commented Oct 19, 2020

To keep track of the uniqueness with a database column isn't the way to go. What you're saying is that we add a boolean column to the products table and after the first job ran we set that column to true. In the job we check if the boolean false, if so we skip it. So we can only update the product once? That's not the behavior I want.

The purpose of the uniqueness is to only have one job at the time in the queue as they're all doing the same thing. But they may be executed more than once, if I update the product now 1 job should be executed. If I update it tomorrow again; another one should run. But if the queue is busy and I update the product 10 times that job should only run once.

@paras-malhotra
Copy link

Oh I see. So, you don't mind the jobs being executed again. You just want to skip the queue if it's busy. In that case, the package meets your requirement.

Although, for the same use case (search indexing), I would recommend that you could just drop overlapping jobs with the WithoutOverlapping middleware and have an expiry (retryUntil) for each job. If the queue is relatively free, the workers will pick up the jobs simulataneously and drop the overlapping ones. If the queue is busy, the jobs will timeout after a certain time reducing overhead.

This way you don't have to rely on a custom queue implementation. Also, note that zadd operations are more expensive than rpush.

@royduin
Copy link
Author

royduin commented Oct 19, 2020

But it feels strange to push jobs into the queue that do not have to be executed + the uniqueness isn't guaranteed because if the first job is finished, the next one will be executed as it prevents overlapping, not uniqueness. Nevertheless, thanks for your work and help! Hope to see uniqueness implemented in the framework itself someday.

@paras-malhotra
Copy link

paras-malhotra commented Oct 19, 2020

if the first job is finished, the next one will be executed

This will also happen with the custom queue implementation packages. When the first job is finished, it is deleted from the queue. The next one will still be executed. For this specific case, a DB query is the only solution.

@royduin
Copy link
Author

royduin commented Oct 19, 2020

True but then there won't be another job as the uniqueness is handled when pushing jobs into the queue. If another job makes it into the queue that means the other one is finished so the new one should be executed.

@themsaid
Copy link
Member

I think what you need it this: https://laravel.com/docs/8.x/cache#managing-locks-across-processes

You acquire a lock before dispatching the job and release it when the job is done. New jobs won't get dispatched while the lock is still in place. Once the lock is released, new dispatches will come in.

@royduin
Copy link
Author

royduin commented Oct 22, 2020

That's an option when the dispatching is handled manually and only one event is queued, but when I've multiple listeners on an event and a listener on multiple events it won't work.

Example: product model with:

protected $dispatchesEvents = [
    'saved' => ProductSaved::class,
];

Within the EventServiceProvider there are multiple listeners on that event and also another event which triggers the listener:

protected $listen = [
    ProductSaved::class => [
        NotifyUsersAboutProductChanges::class,
        SendProductToAlgolia::class,
    ],
    ProductImageChanged::class => [
        SendProductToAlgolia::class,
    ],
]

I could dispatch the event manually from booted() on the model and wrap it with the locking but the locking applies to all events.

@themsaid
Copy link
Member

Dispatch a job inside the SendProductToAlgolia listener. So instead of having the index update inside the listener itself, let the listener dispatch a job that does that and use locks to control the uniqueness.

@royduin
Copy link
Author

royduin commented Oct 22, 2020

So un-queue the SendProductToAlgolia listener (otherwise it's filling up the queue for nothing) and move the code to a job. From the listener we only dispatch the job wrapped with the locking. That should work, thanks for helping out! 💯

Don't you think it's cleaner to have this in the framework so people can just add a unique key to their jobs and listeners?

@themsaid
Copy link
Member

Possibly :) Will look into that when I get a chance. Or maybe @paras-malhotra will beat me, he has been on fire lately 💪🏽🔥

@themsaid
Copy link
Member

I wrote a quick post on that: https://divinglaravel.com/dispatching-unique-jobs-to-laravel-queues

@paras-malhotra
Copy link

paras-malhotra commented Oct 22, 2020

Very elegant solution indeed @themsaid, lock before dispatching and release after processing! 👍

@mfn
Copy link

mfn commented Oct 23, 2020

lock before dispatching and release after processing

Looks smart indeed!

Only thing to watch out: you need to have some idea about the time it takes to process your queue or how long are jobs living in there, as a timeout is required (rightly so).

@atapatel
Copy link

Do we need to create unique key for every lock according to Job?

@elfeffe
Copy link

elfeffe commented Oct 24, 2020

Do we need to create unique key for every lock according to Job?

If you want unique queues, you should create an unique key. You can’t call it “products”, because you will not queue any job for any product until the actual one finished.
It should be like “product:update:1234” where 1234 is the Is of the updated product.

I think the example
$lock = Cache::lock('productsIndex', 120);
Is not very clear, it works because the user wanted to index all on every product update.
In that case “productsIndex” is enough, but for most of the cases I think we have to add some unique key.
It depends on the use case.

@atapatel
Copy link

@elfeffe so for each and evey product i need to create unique lock?
something like
$lock = Cache::lock("product:update:{$product->id}", 120); ??

@elfeffe
Copy link

elfeffe commented Oct 26, 2020

@elfeffe so for each and evey product i need to create unique lock?
something like
$lock = Cache::lock("product:update:{$product->id}", 120); ??

If you want to avoid duplicated jobs for that product update, your key must include the ID of the product.
The key should be as unique as you need to avoid duplicates, in some cases product:update is enough, but in some others, you will need product:update:1234

@atapatel
Copy link

@royduin
Copy link
Author

royduin commented Oct 27, 2020

That's not the same as discussed earlier.

@paras-malhotra
Copy link

I thought about working on implementing this.

We could implement a function unique(string $uniqueBy, int $uniqueForSeconds) on PendingDispatch that acquires a lock, and if not acquired cancels the PendingDispatch by preventing __destruct to dispatch.

That would take care of the lock. To forceRelease the lock, we can implement a ReleaseLockForUniqueJobs "after" middleware that does that.

So, in order to trigger a unique job dispatch, we'd have to say Job::dispatch()->unique(...) and add the ReleaseLockForUniqueJobs middleware to our job.

This two-step process just doesn't seem very intuitive / automatic. Have to remember to both call the unique function while dispatching and add the middleware seems "dirty".

It would be awesome if we could just pull in a trait like use UniqueJob; in our job class but I just don't see how we can implement the middleware this way.

Thoughts? If you guys can think of a better / more intuitive way, I'd be happy to chip in with a PR.

@paras-malhotra
Copy link

I've submitted a PR with a cleaner approach (I hope 🤞).

@paras-malhotra
Copy link

paras-malhotra commented Nov 9, 2020

This feature will be released tomorrow. Docs here: laravel/docs#6558

@royduin, Taylor modified the PR based on your feedback to allow for flexibility to use a separate cache connection/driver for the unique job locks. You can check out the docs on how to do that.

Thanks @themsaid and @royduin for your valuable feedback on the PR! 👍

@royduin
Copy link
Author

royduin commented Nov 10, 2020

Awesome! Thank you for your work 🚀

@royduin royduin closed this as completed Nov 10, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants