Skip to content

Commit

Permalink
enqueue all jobs using a enqueueUsing method
Browse files Browse the repository at this point in the history
  • Loading branch information
themsaid committed Nov 30, 2020
1 parent 7ca3df6 commit a6b13a9
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 21 deletions.
18 changes: 11 additions & 7 deletions src/Illuminate/Queue/BeanstalkdQueue.php
Expand Up @@ -77,7 +77,9 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing($job, function () use ($data, $queue, $job) {
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
}

/**
Expand Down Expand Up @@ -108,12 +110,14 @@ public function later($delay, $job, $data = '', $queue = null)
{
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));

return $pheanstalk->put(
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
return $this->enqueueUsing($job, function () use ($delay, $pheanstalk, $data, $queue, $job) {
return $pheanstalk->put(
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
});
}

/**
Expand Down
16 changes: 10 additions & 6 deletions src/Illuminate/Queue/DatabaseQueue.php
Expand Up @@ -80,9 +80,11 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
return $this->enqueueUsing($job, function () use ($data, $queue, $job) {
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
});
}

/**
Expand All @@ -109,9 +111,11 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) {
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
});
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/Illuminate/Queue/Queue.php
Expand Up @@ -256,6 +256,18 @@ protected function withCreatePayloadHooks($queue, array $payload)
return $payload;
}

/**
* Enqueue a job using the given callback.
*
* @param callable $callback
* @param \Closure|string|object $job
* @return mixed
*/
protected function enqueueUsing($job, $callback)
{
return call_user_func($callback);
}

/**
* Get the connection name for the queue.
*
Expand Down
8 changes: 6 additions & 2 deletions src/Illuminate/Queue/RedisQueue.php
Expand Up @@ -108,7 +108,9 @@ public function bulk($jobs, $data = '', $queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing($job, function () use ($data, $queue, $job) {
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
}

/**
Expand Down Expand Up @@ -140,7 +142,9 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) {
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
}

/**
Expand Down
16 changes: 10 additions & 6 deletions src/Illuminate/Queue/SqsQueue.php
Expand Up @@ -83,7 +83,9 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
return $this->enqueueUsing($job, function () use ($data, $queue, $job) {
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
});
}

/**
Expand Down Expand Up @@ -112,11 +114,13 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) {
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
});
}

/**
Expand Down

0 comments on commit a6b13a9

Please sign in to comment.