Skip to content

Commit

Permalink
refactor enqueueUsing calls (#35437)
Browse files Browse the repository at this point in the history
  • Loading branch information
themsaid committed Dec 1, 2020
1 parent 58aeab2 commit c6ececf
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 42 deletions.
36 changes: 23 additions & 13 deletions src/Illuminate/Queue/BeanstalkdQueue.php
Expand Up @@ -77,9 +77,15 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($job, $data, $queue) {
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}

/**
Expand Down Expand Up @@ -108,16 +114,20 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));

return $this->enqueueUsing($job, function () use ($delay, $pheanstalk, $job, $data, $queue) {
return $pheanstalk->put(
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
$payload,
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
}
);
}

/**
Expand Down
28 changes: 18 additions & 10 deletions src/Illuminate/Queue/DatabaseQueue.php
Expand Up @@ -80,11 +80,15 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($job, $data, $queue) {
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushToDatabase($queue, $payload);
}
);
}

/**
Expand All @@ -111,11 +115,15 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($delay, $job, $data, $queue) {
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue, $delay) {
return $this->pushToDatabase($queue, $payload, $delay);
}
);
}

/**
Expand Down
9 changes: 6 additions & 3 deletions src/Illuminate/Queue/Queue.php
Expand Up @@ -259,13 +259,16 @@ protected function withCreatePayloadHooks($queue, array $payload)
/**
* Enqueue a job using the given callback.
*
* @param callable $callback
* @param \Closure|string|object $job
* @param string $payload
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @param callable $callback
* @return mixed
*/
protected function enqueueUsing($job, $callback)
protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
{
return $callback();
return $callback($payload, $queue, $delay);
}

/**
Expand Down
24 changes: 18 additions & 6 deletions src/Illuminate/Queue/RedisQueue.php
Expand Up @@ -108,9 +108,15 @@ public function bulk($jobs, $data = '', $queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($job, $data, $queue) {
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}

/**
Expand Down Expand Up @@ -142,9 +148,15 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($delay, $job, $data, $queue) {
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->laterRaw($delay, $payload, $queue);
}
);
}

/**
Expand Down
32 changes: 22 additions & 10 deletions src/Illuminate/Queue/SqsQueue.php
Expand Up @@ -83,9 +83,15 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($job, $data, $queue) {
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $queue ?: $this->default, $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}

/**
Expand Down Expand Up @@ -114,13 +120,19 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->enqueueUsing($job, function () use ($delay, $job, $data, $queue) {
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
});
return $this->enqueueUsing(
$job,
$this->createPayload($job, $queue ?: $this->default, $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
);
}

/**
Expand Down

0 comments on commit c6ececf

Please sign in to comment.