diff --git a/src/Illuminate/Contracts/Queue/UniqueJob.php b/src/Illuminate/Contracts/Queue/UniqueJob.php new file mode 100644 index 000000000000..603be95f5d42 --- /dev/null +++ b/src/Illuminate/Contracts/Queue/UniqueJob.php @@ -0,0 +1,8 @@ +job instanceof UniqueJob)) { + return true; + } + + $uniqueId = method_exists($this->job, 'uniqueId') + ? $this->job->uniqueId() + : ($this->job->uniqueId ?? ''); + + $lock = Container::getInstance()->make(Cache::class)->lock( + $key = 'unique:'.get_class($this->job).$uniqueId, + $this->job->uniqueFor ?? 0 + ); + + return (bool) $lock->get(); + } + /** * Dynamically proxy methods to the underlying job. * @@ -142,7 +168,9 @@ public function __call($method, $parameters) */ public function __destruct() { - if ($this->afterResponse) { + if (! $this->shouldDispatch()) { + // Do nothing. + } elseif ($this->afterResponse) { app(Dispatcher::class)->dispatchAfterResponse($this->job); } else { app(Dispatcher::class)->dispatch($this->job); diff --git a/src/Illuminate/Foundation/Console/stubs/job.queued.stub b/src/Illuminate/Foundation/Console/stubs/job.queued.stub index 5f0651cb1789..0be12826fb73 100644 --- a/src/Illuminate/Foundation/Console/stubs/job.queued.stub +++ b/src/Illuminate/Foundation/Console/stubs/job.queued.stub @@ -4,6 +4,7 @@ namespace {{ namespace }}; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Contracts\Queue\UniqueJob; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; diff --git a/src/Illuminate/Queue/CallQueuedHandler.php b/src/Illuminate/Queue/CallQueuedHandler.php index 4a9c9e1f20e4..ec3bfa48ea27 100644 --- a/src/Illuminate/Queue/CallQueuedHandler.php +++ b/src/Illuminate/Queue/CallQueuedHandler.php @@ -5,8 +5,10 @@ use Exception; use Illuminate\Bus\Batchable; use Illuminate\Contracts\Bus\Dispatcher; +use Illuminate\Contracts\Cache\Repository as Cache; use Illuminate\Contracts\Container\Container; use Illuminate\Contracts\Queue\Job; +use Illuminate\Contracts\Queue\UniqueJob; use Illuminate\Database\Eloquent\ModelNotFoundException; use Illuminate\Pipeline\Pipeline; use ReflectionClass; @@ -59,6 +61,10 @@ public function call(Job $job, array $data) $this->dispatchThroughMiddleware($job, $command); + if (! $job->isReleased()) { + $this->ensureUniqueJobLockIsReleased($command); + } + if (! $job->hasFailed() && ! $job->isReleased()) { $this->ensureNextJobInChainIsDispatched($command); $this->ensureSuccessfulBatchJobIsRecorded($command); @@ -153,6 +159,25 @@ protected function ensureSuccessfulBatchJobIsRecorded($command) $command->batch()->recordSuccessfulJob($command->job->uuid()); } + /** + * Ensure the lock for the unique job is released. + * + * @param mixed $command + * @return void + */ + protected function ensureUniqueJobLockIsReleased($command) + { + if ($command instanceof UniqueJob) { + $uniqueId = method_exists($command, 'uniqueId') + ? $command->uniqueId() + : ($command->uniqueId ?? ''); + + $this->container->make(Cache::class) + ->lock('unique:'.get_class($command).$uniqueId) + ->forceRelease(); + } + } + /** * Handle a model not found exception. * @@ -192,6 +217,7 @@ public function failed(array $data, $e, string $uuid) { $command = unserialize($data['command']); + $this->ensureUniqueJobLockIsReleased($command); $this->ensureFailedBatchJobIsRecorded($uuid, $command, $e); $this->ensureChainCatchCallbacksAreInvoked($uuid, $command, $e); diff --git a/tests/Integration/Queue/RateLimitedTest.php b/tests/Integration/Queue/RateLimitedTest.php index c4f12f6144e0..d73dd58a2c29 100644 --- a/tests/Integration/Queue/RateLimitedTest.php +++ b/tests/Integration/Queue/RateLimitedTest.php @@ -88,7 +88,7 @@ protected function assertJobRanSuccessfully($class) $job = m::mock(Job::class); $job->shouldReceive('hasFailed')->once()->andReturn(false); - $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isReleased')->andReturn(false); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); $job->shouldReceive('delete')->once(); @@ -108,7 +108,7 @@ protected function assertJobWasReleased($class) $job->shouldReceive('hasFailed')->once()->andReturn(false); $job->shouldReceive('release')->once(); - $job->shouldReceive('isReleased')->once()->andReturn(true); + $job->shouldReceive('isReleased')->andReturn(true); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); $instance->call($job, [ @@ -126,7 +126,7 @@ protected function assertJobWasSkipped($class) $job = m::mock(Job::class); $job->shouldReceive('hasFailed')->once()->andReturn(false); - $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isReleased')->andReturn(false); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); $job->shouldReceive('delete')->once(); diff --git a/tests/Integration/Queue/RateLimitedWithRedisTest.php b/tests/Integration/Queue/RateLimitedWithRedisTest.php index 5956fff06180..175f531bbfa0 100644 --- a/tests/Integration/Queue/RateLimitedWithRedisTest.php +++ b/tests/Integration/Queue/RateLimitedWithRedisTest.php @@ -119,7 +119,7 @@ protected function assertJobRanSuccessfully($testJob) $job = m::mock(Job::class); $job->shouldReceive('hasFailed')->once()->andReturn(false); - $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isReleased')->andReturn(false); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); $job->shouldReceive('delete')->once(); @@ -139,7 +139,7 @@ protected function assertJobWasReleased($testJob) $job->shouldReceive('hasFailed')->once()->andReturn(false); $job->shouldReceive('release')->once(); - $job->shouldReceive('isReleased')->once()->andReturn(true); + $job->shouldReceive('isReleased')->andReturn(true); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); $instance->call($job, [ @@ -157,7 +157,7 @@ protected function assertJobWasSkipped($testJob) $job = m::mock(Job::class); $job->shouldReceive('hasFailed')->once()->andReturn(false); - $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isReleased')->andReturn(false); $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); $job->shouldReceive('delete')->once(); diff --git a/tests/Integration/Queue/UniqueJobTest.php b/tests/Integration/Queue/UniqueJobTest.php new file mode 100644 index 000000000000..45c09338ae40 --- /dev/null +++ b/tests/Integration/Queue/UniqueJobTest.php @@ -0,0 +1,196 @@ +set('database.default', 'testbench'); + $app['config']->set('database.connections.testbench', [ + 'driver' => 'sqlite', + 'database' => ':memory:', + 'prefix' => '', + ]); + + $app['db']->connection()->getSchemaBuilder()->create('jobs', function (Blueprint $table) { + $table->bigIncrements('id'); + $table->string('queue'); + $table->longText('payload'); + $table->tinyInteger('attempts')->unsigned(); + $table->unsignedInteger('reserved_at')->nullable(); + $table->unsignedInteger('available_at'); + $table->unsignedInteger('created_at'); + $table->index(['queue', 'reserved_at']); + }); + } + + protected function tearDown(): void + { + $this->app['db']->connection()->getSchemaBuilder()->drop('jobs'); + + parent::tearDown(); + + m::close(); + } + + public function testUniqueJobsAreNotDispatched() + { + Bus::fake(); + UniqueTestJob::dispatch(); + Bus::assertDispatched(UniqueTestJob::class); + + $this->assertFalse( + $this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get() + ); + + Bus::fake(); + UniqueTestJob::dispatch(); + Bus::assertNotDispatched(UniqueTestJob::class); + + $this->assertFalse( + $this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get() + ); + } + + public function testLockIsReleasedForSuccessfulJobs() + { + UniqueTestJob::$handled = false; + dispatch($job = new UniqueTestJob); + + $this->assertTrue($job::$handled); + $this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + } + + public function testLockIsReleasedForFailedJobs() + { + UniqueTestFailJob::$handled = false; + + $this->expectException(\Exception::class); + + try { + dispatch($job = new UniqueTestFailJob); + } finally { + $this->assertTrue($job::$handled); + $this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + } + } + + public function testLockIsNotReleasedForJobRetries() + { + UniqueTestRetryJob::$handled = false; + dispatch($job = new UniqueTestRetryJob); + + $this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + + $this->artisan('queue:work', [ + 'connection' => 'database', + '--once' => true, + ]); + + $this->assertTrue($job::$handled); + $this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + + UniqueTestRetryJob::$handled = false; + $this->artisan('queue:work', [ + 'connection' => 'database', + '--once' => true, + ]); + + $this->assertTrue($job::$handled); + $this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + } + + public function testLockIsNotReleasedForJobReleases() + { + UniqueTestReleasedJob::$handled = false; + dispatch($job = new UniqueTestReleasedJob); + + $this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + + $this->artisan('queue:work', [ + 'connection' => 'database', + '--once' => true, + ]); + + $this->assertTrue($job::$handled); + $this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + + UniqueTestReleasedJob::$handled = false; + $this->artisan('queue:work', [ + 'connection' => 'database', + '--once' => true, + ]); + + $this->assertFalse($job::$handled); + $this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get()); + } + + protected function getLockKey($job) + { + return 'unique:'.(is_string($job) ? $job : get_class($job)); + } +} + +class UniqueTestJob implements ShouldQueue, UniqueJob +{ + use InteractsWithQueue, Queueable, Dispatchable; + + public static $handled = false; + + public function handle() + { + static::$handled = true; + } +} + +class UniqueTestFailJob implements ShouldQueue, UniqueJob +{ + use InteractsWithQueue, Queueable, Dispatchable; + + public $tries = 1; + + public static $handled = false; + + public function handle() + { + static::$handled = true; + + throw new \Exception; + } +} + +class UniqueTestReleasedJob extends UniqueTestFailJob +{ + public $tries = 1; + + public $connection = 'database'; + + public function handle() + { + static::$handled = true; + + $this->release(); + } +} + +class UniqueTestRetryJob extends UniqueTestFailJob +{ + public $tries = 2; + + public $connection = 'database'; +}