Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Delay pushing jobs to queue until database transactions are committed #35422

Merged
merged 10 commits into from Dec 9, 2020
1 change: 1 addition & 0 deletions src/Illuminate/Broadcasting/BroadcastEvent.php
Expand Up @@ -46,6 +46,7 @@ public function __construct($event)
$this->event = $event;
$this->tries = property_exists($event, 'tries') ? $event->tries : null;
$this->timeout = property_exists($event, 'timeout') ? $event->timeout : null;
$this->dispatchAfterCommit = property_exists($event, 'dispatchAfterCommit') ? $event->dispatchAfterCommit : null;
taylorotwell marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
31 changes: 31 additions & 0 deletions src/Illuminate/Bus/Queueable.php
Expand Up @@ -52,6 +52,13 @@ trait Queueable
*/
public $delay;

/**
* Indicate the job should be dispatched after database transactions.
*
* @var bool|null
*/
public $dispatchAfterCommit;

/**
* The middleware the job should be dispatched through.
*
Expand Down Expand Up @@ -133,6 +140,30 @@ public function delay($delay)
return $this;
}

/**
* Indicate that the job should be dispatched after database transactions.
*
* @return $this
*/
public function afterCommit()
{
$this->dispatchAfterCommit = true;

return $this;
}

/**
* Indicate that the job should be dispatched before database transactions.
*
* @return $this
*/
public function beforeCommit()
{
$this->dispatchAfterCommit = false;

return $this;
}

/**
* Specify the middleware the job should be dispatched through.
*
Expand Down
2 changes: 2 additions & 0 deletions src/Illuminate/Events/Dispatcher.php
Expand Up @@ -557,6 +557,8 @@ protected function propagateListenerOptions($listener, $job)
$job->backoff = method_exists($listener, 'backoff')
? $listener->backoff() : ($listener->backoff ?? null);
$job->timeout = $listener->timeout ?? null;
$job->dispatchAfterCommit = property_exists($listener, 'dispatchAfterCommit')
? $listener->dispatchAfterCommit : null;
$job->retryUntil = method_exists($listener, 'retryUntil')
? $listener->retryUntil() : null;
});
Expand Down
24 changes: 24 additions & 0 deletions src/Illuminate/Foundation/Bus/PendingDispatch.php
Expand Up @@ -99,6 +99,30 @@ public function delay($delay)
return $this;
}

/**
* Indicate that the job should be dispatched after database transactions.
*
* @return $this
*/
public function afterCommit()
{
$this->job->afterCommit();

return $this;
}

/**
* Indicate that the job should be dispatched before database transactions.
*
* @return $this
*/
public function beforeCommit()
{
$this->job->beforeCommit();

return $this;
}

/**
* Set the jobs that should run if this job is successful.
*
Expand Down
4 changes: 4 additions & 0 deletions src/Illuminate/Mail/SendQueuedMailable.php
Expand Up @@ -2,11 +2,14 @@

namespace Illuminate\Mail;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Mail\Factory as MailFactory;
use Illuminate\Contracts\Mail\Mailable as MailableContract;

class SendQueuedMailable
{
use Queueable;

/**
* The mailable message instance.
*
Expand Down Expand Up @@ -39,6 +42,7 @@ public function __construct(MailableContract $mailable)
$this->mailable = $mailable;
$this->tries = property_exists($mailable, 'tries') ? $mailable->tries : null;
$this->timeout = property_exists($mailable, 'timeout') ? $mailable->timeout : null;
$this->dispatchAfterCommit = property_exists($mailable, 'dispatchAfterCommit') ? $mailable->dispatchAfterCommit : null;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Notifications/SendQueuedNotifications.php
Expand Up @@ -64,6 +64,7 @@ public function __construct($notifiables, $notification, array $channels = null)
$this->notifiables = $this->wrapNotifiables($notifiables);
$this->tries = property_exists($notification, 'tries') ? $notification->tries : null;
$this->timeout = property_exists($notification, 'timeout') ? $notification->timeout : null;
$this->dispatchAfterCommit = property_exists($notification, 'dispatchAfterCommit') ? $notification->dispatchAfterCommit : null;
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/Illuminate/Queue/BeanstalkdQueue.php
Expand Up @@ -44,14 +44,20 @@ class BeanstalkdQueue extends Queue implements QueueContract
* @param string $default
* @param int $timeToRun
* @param int $blockFor
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0)
public function __construct(Pheanstalk $pheanstalk,
$default,
$timeToRun,
$blockFor = 0,
$dispatchAfterCommit = false)
{
$this->default = $default;
$this->blockFor = $blockFor;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/BeanstalkdConnector.php
Expand Up @@ -20,7 +20,8 @@ public function connect(array $config)
$this->pheanstalk($config),
$config['queue'],
$config['retry_after'] ?? Pheanstalk::DEFAULT_TTR,
$config['block_for'] ?? 0
$config['block_for'] ?? 0,
$config['after_commit'] ?? null
);
}

Expand Down
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/DatabaseConnector.php
Expand Up @@ -37,7 +37,8 @@ public function connect(array $config)
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60
$config['retry_after'] ?? 60,
$config['after_commit'] ?? null
);
}
}
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/RedisConnector.php
Expand Up @@ -46,7 +46,8 @@ public function connect(array $config)
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
$config['block_for'] ?? null,
$config['after_commit'] ?? null
);
}
}
6 changes: 5 additions & 1 deletion src/Illuminate/Queue/Connectors/SqsConnector.php
Expand Up @@ -23,7 +23,11 @@ public function connect(array $config)
}

return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? ''
new SqsClient($config),
$config['queue'],
$config['prefix'] ?? '',
$config['suffix'] ?? '',
$config['after_commit'] ?? null
);
}

Expand Down
49 changes: 49 additions & 0 deletions src/Illuminate/Queue/Queue.php
Expand Up @@ -34,6 +34,13 @@ abstract class Queue
*/
protected static $createPayloadCallbacks = [];

/**
* Indicate the job should be dispatched after database transactions.
*
* @var bool|null
*/
protected $dispatchAfterCommit;

/**
* Push a new job onto the queue.
*
Expand Down Expand Up @@ -268,9 +275,51 @@ protected function withCreatePayloadHooks($queue, array $payload)
*/
protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
{
if ($this->shouldDispatchAfterCommit($job) &&
$this->container->bound('db.transactions')) {
return $this->container->make('db.transactions')->addCallback(
$this->afterCommitCallback($payload, $queue, $delay, $callback)
);
}

return $callback($payload, $queue, $delay);
}

/**
* Determine if the job should be dispatched after database transactions.
*
* @param \Closure|string|object $job
* @return bool
*/
protected function shouldDispatchAfterCommit($job)
{
if (is_object($job) && isset($job->dispatchAfterCommit)) {
return $job->dispatchAfterCommit;
}

if (isset($this->dispatchAfterCommit)) {
return $this->dispatchAfterCommit;
}

return false;
}

/**
* Create the after commit callback.
*
* @param string $payload
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @param callable $callback
* @return callable
*/
protected function afterCommitCallback($payload, $queue, $delay, $callback)
{
return function () use ($delay, $queue, $payload, $callback) {
return $callback($payload, $queue, $delay);
};
}

/**
* Get the connection name for the queue.
*
Expand Down
9 changes: 8 additions & 1 deletion src/Illuminate/Queue/RedisQueue.php
Expand Up @@ -53,15 +53,22 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
* @param string|null $connection
* @param int $retryAfter
* @param int|null $blockFor
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
public function __construct(Redis $redis,
$default = 'default',
$connection = null,
$retryAfter = 60,
$blockFor = null,
$dispatchAfterCommit = false)
{
$this->redis = $redis;
$this->default = $default;
$this->blockFor = $blockFor;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/Illuminate/Queue/SqsQueue.php
Expand Up @@ -45,14 +45,20 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue
* @param string $default
* @param string $prefix
* @param string $suffix
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '')
public function __construct(SqsClient $sqs,
$default,
$prefix = '',
$suffix = '',
$dispatchAfterCommit = false)
{
$this->sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
$this->suffix = $suffix;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}

/**
Expand Down
89 changes: 89 additions & 0 deletions tests/Integration/Queue/QueueConnectionTest.php
@@ -0,0 +1,89 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\DatabaseTransactionsManager;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Bus;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class QueueConnectionTest extends TestCase
{
protected function getEnvironmentSetUp($app)
{
$app['config']->set('app.debug', 'true');
$app['config']->set('queue.default', 'sqs');
$app['config']->set('queue.connections.sqs.after_commit', true);
}

protected function tearDown(): void
{
QueueConnectionTestJob::$ran = false;

m::close();
}

public function testJobWontGetDispatchedInsideATransaction()
{
$this->app->singleton('db.transactions', function () {
$transactionManager = m::mock(DatabaseTransactionsManager::class);
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);

return $transactionManager;
});

Bus::dispatch(new QueueConnectionTestJob);
}

public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicated()
{
$this->app->singleton('db.transactions', function () {
$transactionManager = m::mock(DatabaseTransactionsManager::class);
$transactionManager->shouldNotReceive('addCallback')->andReturn(null);

return $transactionManager;
});

try {
Bus::dispatch((new QueueConnectionTestJob)->beforeCommit());
} catch (\Throwable $e) {
// This job was dispatched
}
}

public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicated()
{
$this->app['config']->set('queue.connections.sqs.after_commit', false);

$this->app->singleton('db.transactions', function () {
$transactionManager = m::mock(DatabaseTransactionsManager::class);
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);

return $transactionManager;
});

try {
Bus::dispatch((new QueueConnectionTestJob)->afterCommit());
} catch (SqsException $e) {
// This job was dispatched
}
}
}

class QueueConnectionTestJob implements ShouldQueue
{
use Dispatchable, Queueable;

public static $ran = false;

public function handle()
{
static::$ran = true;
}
}