From a6b13a978a5ab25a9b96c30746b841b90d27ea86 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 12:11:11 +0200 Subject: [PATCH] enqueue all jobs using a enqueueUsing method --- src/Illuminate/Queue/BeanstalkdQueue.php | 18 +++++++++++------- src/Illuminate/Queue/DatabaseQueue.php | 16 ++++++++++------ src/Illuminate/Queue/Queue.php | 12 ++++++++++++ src/Illuminate/Queue/RedisQueue.php | 8 ++++++-- src/Illuminate/Queue/SqsQueue.php | 16 ++++++++++------ 5 files changed, 49 insertions(+), 21 deletions(-) diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 49c36bdac07f..2d8f4275a629 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -77,7 +77,9 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + return $this->enqueueUsing($job, function () use ($data, $queue, $job) { + return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + }); } /** @@ -108,12 +110,14 @@ public function later($delay, $job, $data = '', $queue = null) { $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue)); - return $pheanstalk->put( - $this->createPayload($job, $this->getQueue($queue), $data), - Pheanstalk::DEFAULT_PRIORITY, - $this->secondsUntil($delay), - $this->timeToRun - ); + return $this->enqueueUsing($job, function () use ($delay, $pheanstalk, $data, $queue, $job) { + return $pheanstalk->put( + $this->createPayload($job, $this->getQueue($queue), $data), + Pheanstalk::DEFAULT_PRIORITY, + $this->secondsUntil($delay), + $this->timeToRun + ); + }); } /** diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 89fb91cb3038..7a760d35d299 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -80,9 +80,11 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushToDatabase($queue, $this->createPayload( - $job, $this->getQueue($queue), $data - )); + return $this->enqueueUsing($job, function () use ($data, $queue, $job) { + return $this->pushToDatabase($queue, $this->createPayload( + $job, $this->getQueue($queue), $data + )); + }); } /** @@ -109,9 +111,11 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->pushToDatabase($queue, $this->createPayload( - $job, $this->getQueue($queue), $data - ), $delay); + return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) { + return $this->pushToDatabase($queue, $this->createPayload( + $job, $this->getQueue($queue), $data + ), $delay); + }); } /** diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 2cc64a60e3e4..f68287388c85 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -256,6 +256,18 @@ protected function withCreatePayloadHooks($queue, array $payload) return $payload; } + /** + * Enqueue a job using the given callback. + * + * @param callable $callback + * @param \Closure|string|object $job + * @return mixed + */ + protected function enqueueUsing($job, $callback) + { + return call_user_func($callback); + } + /** * Get the connection name for the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 19fc07589497..6584d34090a5 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -108,7 +108,9 @@ public function bulk($jobs, $data = '', $queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + return $this->enqueueUsing($job, function () use ($data, $queue, $job) { + return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + }); } /** @@ -140,7 +142,9 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue); + return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) { + return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue); + }); } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 8ec5fd110b45..d695da742978 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -83,7 +83,9 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue); + return $this->enqueueUsing($job, function () use ($data, $queue, $job) { + return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue); + }); } /** @@ -112,11 +114,13 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->sqs->sendMessage([ - 'QueueUrl' => $this->getQueue($queue), - 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), - 'DelaySeconds' => $this->secondsUntil($delay), - ])->get('MessageId'); + return $this->enqueueUsing($job, function () use ($delay, $data, $queue, $job) { + return $this->sqs->sendMessage([ + 'QueueUrl' => $this->getQueue($queue), + 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), + 'DelaySeconds' => $this->secondsUntil($delay), + ])->get('MessageId'); + }); } /**