From 4ce7b2e7fffd03b9407e69f7f49804aed08c6188 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 17:08:20 +0200 Subject: [PATCH 1/8] allow marking jobs to be dispatched after db transactions commit --- .../Broadcasting/BroadcastEvent.php | 1 + src/Illuminate/Bus/Queueable.php | 20 +++++++++++++++++++ src/Illuminate/Events/Dispatcher.php | 2 ++ src/Illuminate/Mail/SendQueuedMailable.php | 4 ++++ .../Notifications/SendQueuedNotifications.php | 1 + src/Illuminate/Queue/BeanstalkdQueue.php | 8 +++++++- .../Queue/Connectors/BeanstalkdConnector.php | 3 ++- .../Queue/Connectors/DatabaseConnector.php | 3 ++- .../Queue/Connectors/RedisConnector.php | 3 ++- .../Queue/Connectors/SqsConnector.php | 6 +++++- src/Illuminate/Queue/Queue.php | 7 +++++++ src/Illuminate/Queue/RedisQueue.php | 9 ++++++++- src/Illuminate/Queue/SqsQueue.php | 8 +++++++- 13 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/Illuminate/Broadcasting/BroadcastEvent.php b/src/Illuminate/Broadcasting/BroadcastEvent.php index 775df78059d7..4ef4b73a4ce2 100644 --- a/src/Illuminate/Broadcasting/BroadcastEvent.php +++ b/src/Illuminate/Broadcasting/BroadcastEvent.php @@ -46,6 +46,7 @@ public function __construct($event) $this->event = $event; $this->tries = property_exists($event, 'tries') ? $event->tries : null; $this->timeout = property_exists($event, 'timeout') ? $event->timeout : null; + $this->dispatchAfterCommit = property_exists($event, 'dispatchAfterCommit') ? $event->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Bus/Queueable.php b/src/Illuminate/Bus/Queueable.php index c2520b98c040..68866deae292 100644 --- a/src/Illuminate/Bus/Queueable.php +++ b/src/Illuminate/Bus/Queueable.php @@ -52,6 +52,13 @@ trait Queueable */ public $delay; + /** + * Indicate the job should be dispatched after database transactions. + * + * @var bool|null + */ + public $dispatchAfterCommit; + /** * The middleware the job should be dispatched through. * @@ -133,6 +140,19 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @param bool|null $value + * @return $this + */ + public function afterCommit($value = true) + { + $this->dispatchAfterCommit = $value; + + return $this; + } + /** * Specify the middleware the job should be dispatched through. * diff --git a/src/Illuminate/Events/Dispatcher.php b/src/Illuminate/Events/Dispatcher.php index 91214b457188..9116c2f0c3f5 100755 --- a/src/Illuminate/Events/Dispatcher.php +++ b/src/Illuminate/Events/Dispatcher.php @@ -557,6 +557,8 @@ protected function propagateListenerOptions($listener, $job) $job->backoff = method_exists($listener, 'backoff') ? $listener->backoff() : ($listener->backoff ?? null); $job->timeout = $listener->timeout ?? null; + $job->dispatchAfterCommit = property_exists($listener, 'dispatchAfterCommit') + ? $listener->dispatchAfterCommit : null; $job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil() : null; }); diff --git a/src/Illuminate/Mail/SendQueuedMailable.php b/src/Illuminate/Mail/SendQueuedMailable.php index 76822bcd05f5..9e656c10a925 100644 --- a/src/Illuminate/Mail/SendQueuedMailable.php +++ b/src/Illuminate/Mail/SendQueuedMailable.php @@ -2,11 +2,14 @@ namespace Illuminate\Mail; +use Illuminate\Bus\Queueable; use Illuminate\Contracts\Mail\Factory as MailFactory; use Illuminate\Contracts\Mail\Mailable as MailableContract; class SendQueuedMailable { + use Queueable; + /** * The mailable message instance. * @@ -39,6 +42,7 @@ public function __construct(MailableContract $mailable) $this->mailable = $mailable; $this->tries = property_exists($mailable, 'tries') ? $mailable->tries : null; $this->timeout = property_exists($mailable, 'timeout') ? $mailable->timeout : null; + $this->dispatchAfterCommit = property_exists($mailable, 'dispatchAfterCommit') ? $mailable->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Notifications/SendQueuedNotifications.php b/src/Illuminate/Notifications/SendQueuedNotifications.php index bab695284725..4d78873619f5 100644 --- a/src/Illuminate/Notifications/SendQueuedNotifications.php +++ b/src/Illuminate/Notifications/SendQueuedNotifications.php @@ -64,6 +64,7 @@ public function __construct($notifiables, $notification, array $channels = null) $this->notifiables = $this->wrapNotifiables($notifiables); $this->tries = property_exists($notification, 'tries') ? $notification->tries : null; $this->timeout = property_exists($notification, 'timeout') ? $notification->timeout : null; + $this->dispatchAfterCommit = property_exists($notification, 'dispatchAfterCommit') ? $notification->dispatchAfterCommit : null; } /** diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 49c36bdac07f..e867976014e4 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -44,14 +44,20 @@ class BeanstalkdQueue extends Queue implements QueueContract * @param string $default * @param int $timeToRun * @param int $blockFor + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0) + public function __construct(Pheanstalk $pheanstalk, + $default, + $timeToRun, + $blockFor = 0, + $dispatchAfterCommit = false) { $this->default = $default; $this->blockFor = $blockFor; $this->timeToRun = $timeToRun; $this->pheanstalk = $pheanstalk; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** diff --git a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php index b54d80193b69..fdcdb355594e 100755 --- a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php +++ b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php @@ -20,7 +20,8 @@ public function connect(array $config) $this->pheanstalk($config), $config['queue'], $config['retry_after'] ?? Pheanstalk::DEFAULT_TTR, - $config['block_for'] ?? 0 + $config['block_for'] ?? 0, + $config['after_commit'] ?? null ); } diff --git a/src/Illuminate/Queue/Connectors/DatabaseConnector.php b/src/Illuminate/Queue/Connectors/DatabaseConnector.php index 893a898f6b66..eeabc8ee7f7f 100644 --- a/src/Illuminate/Queue/Connectors/DatabaseConnector.php +++ b/src/Illuminate/Queue/Connectors/DatabaseConnector.php @@ -37,7 +37,8 @@ public function connect(array $config) $this->connections->connection($config['connection'] ?? null), $config['table'], $config['queue'], - $config['retry_after'] ?? 60 + $config['retry_after'] ?? 60, + $config['after_commit'] ?? null ); } } diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index 1efe5f65e903..966fe49071a8 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -46,7 +46,8 @@ public function connect(array $config) $this->redis, $config['queue'], $config['connection'] ?? $this->connection, $config['retry_after'] ?? 60, - $config['block_for'] ?? null + $config['block_for'] ?? null, + $config['after_commit'] ?? null ); } } diff --git a/src/Illuminate/Queue/Connectors/SqsConnector.php b/src/Illuminate/Queue/Connectors/SqsConnector.php index 07d7f8232674..029c607c4328 100755 --- a/src/Illuminate/Queue/Connectors/SqsConnector.php +++ b/src/Illuminate/Queue/Connectors/SqsConnector.php @@ -23,7 +23,11 @@ public function connect(array $config) } return new SqsQueue( - new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? '' + new SqsClient($config), + $config['queue'], + $config['prefix'] ?? '', + $config['suffix'] ?? '', + $config['after_commit'] ?? null ); } diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 2cc64a60e3e4..564fd9373edf 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -34,6 +34,13 @@ abstract class Queue */ protected static $createPayloadCallbacks = []; + /** + * Indicate the job should be dispatched after database transactions. + * + * @var bool|null + */ + public $dispatchAfterCommit; + /** * Push a new job onto the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 19fc07589497..81a878b5123c 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -53,15 +53,22 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param string|null $connection * @param int $retryAfter * @param int|null $blockFor + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null) + public function __construct(Redis $redis, + $default = 'default', + $connection = null, + $retryAfter = 60, + $blockFor = null, + $dispatchAfterCommit = false) { $this->redis = $redis; $this->default = $default; $this->blockFor = $blockFor; $this->connection = $connection; $this->retryAfter = $retryAfter; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 8ec5fd110b45..f0d5273b68e9 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -45,14 +45,20 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * @param string $default * @param string $prefix * @param string $suffix + * @param bool $dispatchAfterCommit * @return void */ - public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '') + public function __construct(SqsClient $sqs, + $default, + $prefix = '', + $suffix = '', + $dispatchAfterCommit = false) { $this->sqs = $sqs; $this->prefix = $prefix; $this->default = $default; $this->suffix = $suffix; + $this->dispatchAfterCommit = $dispatchAfterCommit; } /** From 4f94ff94188b84852a98fae9c6971f56cd99f5d5 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 17:50:02 +0200 Subject: [PATCH 2/8] add tests --- src/Illuminate/Bus/Queueable.php | 17 +++- src/Illuminate/Queue/Queue.php | 29 +++++- .../Integration/Queue/QueueConnectionTest.php | 95 +++++++++++++++++++ 3 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 tests/Integration/Queue/QueueConnectionTest.php diff --git a/src/Illuminate/Bus/Queueable.php b/src/Illuminate/Bus/Queueable.php index 68866deae292..5cf4777620b5 100644 --- a/src/Illuminate/Bus/Queueable.php +++ b/src/Illuminate/Bus/Queueable.php @@ -143,12 +143,23 @@ public function delay($delay) /** * Indicate that the job should be dispatched after database transactions. * - * @param bool|null $value * @return $this */ - public function afterCommit($value = true) + public function afterCommit() { - $this->dispatchAfterCommit = $value; + $this->dispatchAfterCommit = true; + + return $this; + } + + /** + * Indicate that the job should be dispatched before database transactions. + * + * @return $this + */ + public function beforeCommit() + { + $this->dispatchAfterCommit = false; return $this; } diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 10dbc5362e07..7be654a7d5bf 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -39,7 +39,7 @@ abstract class Queue * * @var bool|null */ - public $dispatchAfterCommit; + protected $dispatchAfterCommit; /** * Push a new job onto the queue. @@ -272,9 +272,36 @@ protected function withCreatePayloadHooks($queue, array $payload) */ protected function enqueueUsing($job, $callback) { + if ($this->shouldDispatchAfterCommit($job) && + $this->container->bound('db.transactions')) { + return $this->container->make('db.transactions')->addCallback( + $callback + ); + + } + return $callback(); } + /** + * Determine if the job should be dispatched after database transactions. + * + * @param \Closure|string|object $job + * @return bool + */ + protected function shouldDispatchAfterCommit($job) + { + if (is_object($job) && isset($job->dispatchAfterCommit)) { + return $job->dispatchAfterCommit; + } + + if (isset($this->dispatchAfterCommit)) { + return $this->dispatchAfterCommit; + } + + return false; + } + /** * Get the connection name for the queue. * diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php new file mode 100644 index 000000000000..85a2c54d3e12 --- /dev/null +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -0,0 +1,95 @@ +set('app.debug', 'true'); + $app['config']->set('queue.default', 'sqs'); + $app['config']->set('queue.connections.sqs.after_commit', true); + } + + protected function tearDown(): void + { + QueueConnectionTestJob::$ran = false; + + m::close(); + } + + public function testJobWontGetDispatchedInsideATransaction() + { + + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + + return $transactionManager; + }); + + + Bus::dispatch(new QueueConnectionTestJob); + } + + public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicated() + { + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldNotReceive('addCallback')->andReturn(null); + + return $transactionManager; + }); + + + try { + Bus::dispatch((new QueueConnectionTestJob)->beforeCommit()); + } catch (SqsException $e) { + // This job was dispatched + } + } + + public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicated() + { + $this->app['config']->set('queue.connections.sqs.after_commit', false); + + $this->app->singleton('db.transactions', function () { + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + + return $transactionManager; + }); + + + try { + Bus::dispatch((new QueueConnectionTestJob)->afterCommit()); + } catch (SqsException $e) { + // This job was dispatched + } + } +} + +class QueueConnectionTestJob implements ShouldQueue +{ + use Dispatchable, Queueable; + + public static $ran = false; + + public function handle() + { + static::$ran = true; + } +} From 4f76ee512f527c9a3313ab42cae3bd9da6825ce8 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 17:54:22 +0200 Subject: [PATCH 3/8] include afterCommit and beforeCommit in PendingDispatch --- .../Foundation/Bus/PendingDispatch.php | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index 9495dc15c114..1d54b9534bd2 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -99,6 +99,30 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @return $this + */ + public function afterCommit() + { + $this->job->afterCommit(); + + return $this; + } + + /** + * Indicate that the job should be dispatched before database transactions. + * + * @return $this + */ + public function beforeCommit() + { + $this->job->beforeCommit(); + + return $this; + } + /** * Set the jobs that should run if this job is successful. * From 56e1b24b0f04cb3398055109ef3028b66df64473 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 17:55:57 +0200 Subject: [PATCH 4/8] fix style --- tests/Integration/Queue/QueueConnectionTest.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index 85a2c54d3e12..1dd5e2121fd0 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -5,7 +5,6 @@ use Aws\Sqs\Exception\SqsException; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; -use Illuminate\Database\Connection; use Illuminate\Database\DatabaseTransactionsManager; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Support\Facades\Bus; @@ -41,7 +40,6 @@ public function testJobWontGetDispatchedInsideATransaction() return $transactionManager; }); - Bus::dispatch(new QueueConnectionTestJob); } @@ -54,7 +52,6 @@ public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicate return $transactionManager; }); - try { Bus::dispatch((new QueueConnectionTestJob)->beforeCommit()); } catch (SqsException $e) { @@ -73,7 +70,6 @@ public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicate return $transactionManager; }); - try { Bus::dispatch((new QueueConnectionTestJob)->afterCommit()); } catch (SqsException $e) { From 2ba6890a53b2b81da93e553c1ab42e39bfb4bc3a Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 17:58:16 +0200 Subject: [PATCH 5/8] fix style --- src/Illuminate/Queue/Queue.php | 1 - tests/Integration/Queue/QueueConnectionTest.php | 1 - 2 files changed, 2 deletions(-) diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 7be654a7d5bf..589dc9c98a50 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -277,7 +277,6 @@ protected function enqueueUsing($job, $callback) return $this->container->make('db.transactions')->addCallback( $callback ); - } return $callback(); diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index 1dd5e2121fd0..3623c7360a3b 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -32,7 +32,6 @@ protected function tearDown(): void public function testJobWontGetDispatchedInsideATransaction() { - $this->app->singleton('db.transactions', function () { $transactionManager = m::mock(DatabaseTransactionsManager::class); $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); From 399672c2e42aee28b6524d1300339aadbce762a5 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 30 Nov 2020 18:02:23 +0200 Subject: [PATCH 6/8] fix tests --- tests/Integration/Queue/QueueConnectionTest.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index 3623c7360a3b..d9824e22d077 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -2,7 +2,6 @@ namespace Illuminate\Tests\Integration\Queue; -use Aws\Sqs\Exception\SqsException; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Database\DatabaseTransactionsManager; @@ -53,7 +52,7 @@ public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicate try { Bus::dispatch((new QueueConnectionTestJob)->beforeCommit()); - } catch (SqsException $e) { + } catch (\Throwable $e) { // This job was dispatched } } From 5ef349b92e99537948f8e3187817e54ff082e7e1 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 9 Dec 2020 16:49:06 +0200 Subject: [PATCH 7/8] fix tests --- src/Illuminate/Queue/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 8fac36a6c498..b6f87b969ec7 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -278,7 +278,7 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback) if ($this->shouldDispatchAfterCommit($job) && $this->container->bound('db.transactions')) { return $this->container->make('db.transactions')->addCallback( - $callback($payload, $queue, $delay) + $callback ); } From ed6bd2eaddafddd6be05c7144114c78803f4c1a7 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 9 Dec 2020 16:58:18 +0200 Subject: [PATCH 8/8] fix tests --- src/Illuminate/Queue/Queue.php | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index b6f87b969ec7..b8f525918098 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -278,7 +278,7 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback) if ($this->shouldDispatchAfterCommit($job) && $this->container->bound('db.transactions')) { return $this->container->make('db.transactions')->addCallback( - $callback + $this->afterCommitCallback($payload, $queue, $delay, $callback) ); } @@ -304,6 +304,22 @@ protected function shouldDispatchAfterCommit($job) return false; } + /** + * Create the after commit callback. + * + * @param string $payload + * @param string $queue + * @param \DateTimeInterface|\DateInterval|int|null $delay + * @param callable $callback + * @return callable + */ + protected function afterCommitCallback($payload, $queue, $delay, $callback) + { + return function () use ($delay, $queue, $payload, $callback) { + return $callback($payload, $queue, $delay); + }; + } + /** * Get the connection name for the queue. *