diff --git a/src/Illuminate/Broadcasting/BroadcastEvent.php b/src/Illuminate/Broadcasting/BroadcastEvent.php index 775df78059d7..4ef4b73a4ce2 100644 --- a/src/Illuminate/Broadcasting/BroadcastEvent.php +++ b/src/Illuminate/Broadcasting/BroadcastEvent.php @@ -46,6 +46,7 @@ public function __construct($event) $this->event = $event; $this->tries = property_exists($event, 'tries') ? $event->tries : null; $this->timeout = property_exists($event, 'timeout') ? $event->timeout : null; + $this->dispatchAfterCommit = property_exists($event, 'dispatchAfterCommit') ? $event->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Bus/Queueable.php b/src/Illuminate/Bus/Queueable.php index c2520b98c040..5cf4777620b5 100644 --- a/src/Illuminate/Bus/Queueable.php +++ b/src/Illuminate/Bus/Queueable.php @@ -52,6 +52,13 @@ trait Queueable */ public $delay; + /** + * Indicate the job should be dispatched after database transactions. + * + * @var bool|null + */ + public $dispatchAfterCommit; + /** * The middleware the job should be dispatched through. * @@ -133,6 +140,30 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @return $this + */ + public function afterCommit() + { + $this->dispatchAfterCommit = true; + + return $this; + } + + /** + * Indicate that the job should be dispatched before database transactions. + * + * @return $this + */ + public function beforeCommit() + { + $this->dispatchAfterCommit = false; + + return $this; + } + /** * Specify the middleware the job should be dispatched through. * diff --git a/src/Illuminate/Events/Dispatcher.php b/src/Illuminate/Events/Dispatcher.php index 91214b457188..9116c2f0c3f5 100755 --- a/src/Illuminate/Events/Dispatcher.php +++ b/src/Illuminate/Events/Dispatcher.php @@ -557,6 +557,8 @@ protected function propagateListenerOptions($listener, $job) $job->backoff = method_exists($listener, 'backoff') ? $listener->backoff() : ($listener->backoff ?? null); $job->timeout = $listener->timeout ?? null; + $job->dispatchAfterCommit = property_exists($listener, 'dispatchAfterCommit') + ? $listener->dispatchAfterCommit : null; $job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil() : null; }); diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index 9495dc15c114..1d54b9534bd2 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -99,6 +99,30 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @return $this + */ + public function afterCommit() + { + $this->job->afterCommit(); + + return $this; + } + + /** + * Indicate that the job should be dispatched before database transactions. + * + * @return $this + */ + public function beforeCommit() + { + $this->job->beforeCommit(); + + return $this; + } + /** * Set the jobs that should run if this job is successful. * diff --git a/src/Illuminate/Mail/SendQueuedMailable.php b/src/Illuminate/Mail/SendQueuedMailable.php index 76822bcd05f5..9e656c10a925 100644 --- a/src/Illuminate/Mail/SendQueuedMailable.php +++ b/src/Illuminate/Mail/SendQueuedMailable.php @@ -2,11 +2,14 @@ namespace Illuminate\Mail; +use Illuminate\Bus\Queueable; use Illuminate\Contracts\Mail\Factory as MailFactory; use Illuminate\Contracts\Mail\Mailable as MailableContract; class SendQueuedMailable { + use Queueable; + /** * The mailable message instance. * @@ -39,6 +42,7 @@ public function __construct(MailableContract $mailable) $this->mailable = $mailable; $this->tries = property_exists($mailable, 'tries') ? $mailable->tries : null; $this->timeout = property_exists($mailable, 'timeout') ? $mailable->timeout : null; + $this->dispatchAfterCommit = property_exists($mailable, 'dispatchAfterCommit') ? $mailable->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Notifications/SendQueuedNotifications.php b/src/Illuminate/Notifications/SendQueuedNotifications.php index bab695284725..4d78873619f5 100644 --- a/src/Illuminate/Notifications/SendQueuedNotifications.php +++ b/src/Illuminate/Notifications/SendQueuedNotifications.php @@ -64,6 +64,7 @@ public function __construct($notifiables, $notification, array $channels = null) $this->notifiables = $this->wrapNotifiables($notifiables); $this->tries = property_exists($notification, 'tries') ? $notification->tries : null; $this->timeout = property_exists($notification, 'timeout') ? $notification->timeout : null; + $this->dispatchAfterCommit = property_exists($notification, 'dispatchAfterCommit') ? $notification->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 0c99c7e5e3ec..b258c49418ce 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -44,14 +44,20 @@ class BeanstalkdQueue extends Queue implements QueueContract * @param string $default * @param int $timeToRun * @param int $blockFor + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0) + public function __construct(Pheanstalk $pheanstalk, + $default, + $timeToRun, + $blockFor = 0, + $dispatchAfterCommit = false) { $this->default = $default; $this->blockFor = $blockFor; $this->timeToRun = $timeToRun; $this->pheanstalk = $pheanstalk; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** diff --git a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php index b54d80193b69..fdcdb355594e 100755 --- a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php +++ b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php @@ -20,7 +20,8 @@ public function connect(array $config) $this->pheanstalk($config), $config['queue'], $config['retry_after'] ?? Pheanstalk::DEFAULT_TTR, - $config['block_for'] ?? 0 + $config['block_for'] ?? 0, + $config['after_commit'] ?? null ); } diff --git a/src/Illuminate/Queue/Connectors/DatabaseConnector.php b/src/Illuminate/Queue/Connectors/DatabaseConnector.php index 893a898f6b66..eeabc8ee7f7f 100644 --- a/src/Illuminate/Queue/Connectors/DatabaseConnector.php +++ b/src/Illuminate/Queue/Connectors/DatabaseConnector.php @@ -37,7 +37,8 @@ public function connect(array $config) $this->connections->connection($config['connection'] ?? null), $config['table'], $config['queue'], - $config['retry_after'] ?? 60 + $config['retry_after'] ?? 60, + $config['after_commit'] ?? null ); } } diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index 1efe5f65e903..966fe49071a8 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -46,7 +46,8 @@ public function connect(array $config) $this->redis, $config['queue'], $config['connection'] ?? $this->connection, $config['retry_after'] ?? 60, - $config['block_for'] ?? null + $config['block_for'] ?? null, + $config['after_commit'] ?? null ); } } diff --git a/src/Illuminate/Queue/Connectors/SqsConnector.php b/src/Illuminate/Queue/Connectors/SqsConnector.php index 07d7f8232674..029c607c4328 100755 --- a/src/Illuminate/Queue/Connectors/SqsConnector.php +++ b/src/Illuminate/Queue/Connectors/SqsConnector.php @@ -23,7 +23,11 @@ public function connect(array $config) } return new SqsQueue( - new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? '' + new SqsClient($config), + $config['queue'], + $config['prefix'] ?? '', + $config['suffix'] ?? '', + $config['after_commit'] ?? null ); } diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 40023ae34e3d..b8f525918098 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -34,6 +34,13 @@ abstract class Queue */ protected static $createPayloadCallbacks = []; + /** + * Indicate the job should be dispatched after database transactions. + * + * @var bool|null + */ + protected $dispatchAfterCommit; + /** * Push a new job onto the queue. * @@ -268,9 +275,51 @@ protected function withCreatePayloadHooks($queue, array $payload) */ protected function enqueueUsing($job, $payload, $queue, $delay, $callback) { + if ($this->shouldDispatchAfterCommit($job) && + $this->container->bound('db.transactions')) { + return $this->container->make('db.transactions')->addCallback( + $this->afterCommitCallback($payload, $queue, $delay, $callback) + ); + } + return $callback($payload, $queue, $delay); } + /** + * Determine if the job should be dispatched after database transactions. + * + * @param \Closure|string|object $job + * @return bool + */ + protected function shouldDispatchAfterCommit($job) + { + if (is_object($job) && isset($job->dispatchAfterCommit)) { + return $job->dispatchAfterCommit; + } + + if (isset($this->dispatchAfterCommit)) { + return $this->dispatchAfterCommit; + } + + return false; + } + + /** + * Create the after commit callback. + * + * @param string $payload + * @param string $queue + * @param \DateTimeInterface|\DateInterval|int|null $delay + * @param callable $callback + * @return callable + */ + protected function afterCommitCallback($payload, $queue, $delay, $callback) + { + return function () use ($delay, $queue, $payload, $callback) { + return $callback($payload, $queue, $delay); + }; + } + /** * Get the connection name for the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 080aeb315581..3aca40c9c388 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -53,15 +53,22 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param string|null $connection * @param int $retryAfter * @param int|null $blockFor + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null) + public function __construct(Redis $redis, + $default = 'default', + $connection = null, + $retryAfter = 60, + $blockFor = null, + $dispatchAfterCommit = false) { $this->redis = $redis; $this->default = $default; $this->blockFor = $blockFor; $this->connection = $connection; $this->retryAfter = $retryAfter; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 281cb862f61f..1ef834bb7bae 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -45,14 +45,20 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * @param string $default * @param string $prefix * @param string $suffix + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '') + public function __construct(SqsClient $sqs, + $default, + $prefix = '', + $suffix = '', + $dispatchAfterCommit = false) { $this->sqs = $sqs; $this->prefix = $prefix; $this->default = $default; $this->suffix = $suffix; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php new file mode 100644 index 000000000000..d9824e22d077 --- /dev/null +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -0,0 +1,89 @@ +set('app.debug', 'true'); + $app['config']->set('queue.default', 'sqs'); + $app['config']->set('queue.connections.sqs.after_commit', true); + } + + protected function tearDown(): void + { + QueueConnectionTestJob::$ran = false; + + m::close(); + } + + public function testJobWontGetDispatchedInsideATransaction() + { + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + + return $transactionManager; + }); + + Bus::dispatch(new QueueConnectionTestJob); + } + + public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicated() + { + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldNotReceive('addCallback')->andReturn(null); + + return $transactionManager; + }); + + try { + Bus::dispatch((new QueueConnectionTestJob)->beforeCommit()); + } catch (\Throwable $e) { + // This job was dispatched + } + } + + public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicated() + { + $this->app['config']->set('queue.connections.sqs.after_commit', false); + + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + + return $transactionManager; + }); + + try { + Bus::dispatch((new QueueConnectionTestJob)->afterCommit()); + } catch (SqsException $e) { + // This job was dispatched + } + } +} + +class QueueConnectionTestJob implements ShouldQueue +{ + use Dispatchable, Queueable; + + public static $ran = false; + + public function handle() + { + static::$ran = true; + } +}