Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beforeEnqueue/afterEnqueue not called upon Retry #721

Open
ehainer opened this issue Dec 12, 2021 · 6 comments
Open

beforeEnqueue/afterEnqueue not called upon Retry #721

ehainer opened this issue Dec 12, 2021 · 6 comments

Comments

@ehainer
Copy link

ehainer commented Dec 12, 2021

Pretty obvious why neither of these events are emitted when the Retry plugin re-enqueues a job:

The Queue#enqueue method emits both, but the logic for the retry plugin includes a call to Queue#enqueueIn which in turn calls Queue#enqueueAt, which does not emit either event.

The main issue I'm trying to overcome is to manipulate the args passed to perform before/after a job runs. Specifically, I'd like to pass a Sequelize model instance as an arg, but normally when it's serialized you obviously lose the instance and are left with just a plain object. To that end, I wanted to implement an interceptor like so:

beforeEnqueue() {
  // iterate through args, convert anything that is instanceof Model to 'gid://<Model Name>/<ID>'
}

beforePerform() {
  // iterate through args, anything that starts with 'gid://' is converted back to the instance
}

The above works fine on the first attempt at the job, but if the job fails and is retried, beforeEnqueue is not called and the resulting arg is serialized normally, and I have just a sad plain object 😦

Unless there is an alternative, glaringly obvious way to achieve my goal that I missing... Open to ideas.

Thanks!

@evantahler
Copy link
Member

evantahler commented Dec 12, 2021

Welcome @ehainer!

First, I love that auto-serialization pattern for Seqeulzie objects. At work we manually pluck instance IDs when enqueueing jobs... this would be a nice change to our internal APIs.

What we should probably do in this library is add another plugin hook called beforeDelayEnqueue and afterDelayEnqueue. In the most pedantic sense, "beforeEnqueue" means "before adding the job to the work queue" which doesn't really happen until later in the delayed case - that happens once the scheduler determines that it's time to work this job and moves it form the "delayed queue" to the "work queue". Adding these 2 new plugin hooks would allow you to do serialization your objects in both cases.

you would then do:

beforeDelayEnqueue() {
  // iterate through args, convert anything that is instanceof Model to 'gid://<Model Name>/<ID>'
}

beforeEnqueue() {
  // iterate through args, convert anything that is instanceof Model to 'gid://<Model Name>/<ID>'
}

beforePerform() {
  // iterate through args, anything that starts with 'gid://' is converted back to the instance
}

Are you up for adding these new plugin hooks in a pull request? It would really useful to get this back in the library for others to use!

@ehainer
Copy link
Author

ehainer commented Dec 13, 2021

Ha! Wish I could take credit for that pattern. It comes from rails

Your thinking sounds alright to me. A clear separation of "enqueued first" vs "enqueued again" seems the most logical. However, I've been working on a PR for a few hours now, and actually think this could be solved another way. The main issue I'm encountering is that args can't be modified from within a plugin due to this block of code (in worker.ts)

for (const i in callableArgs) {
  if (typeof callableArgs[i] === "object" && callableArgs[i] !== null) {
    Object.freeze(callableArgs[i]); // <--- Specifically, this
  }
}

Freezing the args appeared to be what was making this so troublesome. By simply removing that, I could then write something like this:

beforeEnqueue() {
  this.args = globalize(this.args) // Deep scans arrays or objects and converts the model instance to 'gid://...'
  return true
}

async beforePerform() {
  // Need to parse stringified args to avoid manipulating the original array
  this.args = await unglobalize(JSON.parse(JSON.stringify(this.args))) // unglobalize just reverses the above, gid://... -> Model
  return true
}

This way, the arguments received by perform are what I'd expect, but there's no constant back and forth of globalize/unglobalize during retries. The args, at least from the perspective of redis, are manipulated once before enqueueing, and not after that.

I can certainly understand the use of Object.freeze here, since it avoids potentially altering args in a way that could be detrimental (i.e. - this.args = 'something completely unrelated'), but it seems that the existing functionality is actually completely fine so long as users have the ability to alter the actual args.

Thoughts? I'm happy to keep going on the PR, I just don't want to add unnecessary complexity when the alternative is removing 5 lines of code and hoping people don't alter args in a way they shouldn't. Just don't know enough about this package to know if that would break something elsewhere.

@evantahler
Copy link
Member

I think I'd like to keep Object.freeze - the immutability of what's passed to the job probably should be maintained. It's an old 2015 issue, but it prevents these kind of problems - #99

@ehainer
Copy link
Author

ehainer commented Dec 29, 2021

Been messing with this for a while now, found some issues in the previous PR which is why I closed it. The beforeDelayEnqueue/afterDelayEnqueue are easy enough, but Object.freeze still presents problems since the args are all frozen by the time they are fed into afterPerform/beforeEnqueue/beforeDelayEnqueue.

Would it work to change Object.freeze to Object.seal?

Even after looking at #99 it's a little unclear whether the issue was with the addition of new properties to args or if it's a problem that the value of existing properties changes. Object.seal would prevent new properties, but still allow for existing property values to be changed.

@evantahler
Copy link
Member

The work you did in #737 was really good - I hope you kept a backup around!

Maybe the thing to do is to make freezing the args optional at a higher level - either they are frozen or not, and the default would be frozen to keep the current default behavior. Perhaps when defining a Job there's a new option:

const jobs = {
    add: {
      freezeArgs: false, // <--- HERE
      plugins: [Plugins.JobLock],
      pluginOptions: {
        JobLock: { reEnqueue: true },
      },
      perform: async (a, b) => {
        const answer = a + b;
        return answer;
      },
    },
    subtract: {
      perform: (a, b) => {
        const answer = a - b;
        return answer;
      },
    },
  };

So the Add job could have it's args modified by a plugin, and the Subtract job could not. If this was implemented first, would this make the work in #737 work the way you want?

@evantahler
Copy link
Member

Ping! Just checking in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants