Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Add ability to dispatch unique jobs #35042

Merged
merged 9 commits into from Nov 4, 2020
8 changes: 8 additions & 0 deletions src/Illuminate/Contracts/Queue/UniqueJob.php
@@ -0,0 +1,8 @@
<?php

namespace Illuminate\Contracts\Queue;

interface UniqueJob
{
//
}
30 changes: 29 additions & 1 deletion src/Illuminate/Foundation/Bus/PendingDispatch.php
Expand Up @@ -2,7 +2,10 @@

namespace Illuminate\Foundation\Bus;

use Illuminate\Container\Container;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Queue\UniqueJob;

class PendingDispatch
{
Expand Down Expand Up @@ -121,6 +124,29 @@ public function afterResponse()
return $this;
}

/**
* Determine if the job should be dispatched.
*
* @return bool
*/
protected function shouldDispatch()
{
if (! ($this->job instanceof UniqueJob)) {
return true;
}

$uniqueId = method_exists($this->job, 'uniqueId')
? $this->job->uniqueId()
: ($this->job->uniqueId ?? '');

$lock = Container::getInstance()->make(Cache::class)->lock(
$key = 'unique:'.get_class($this->job).$uniqueId,
$this->job->uniqueFor ?? 0
);

return (bool) $lock->get();
}

/**
* Dynamically proxy methods to the underlying job.
*
Expand All @@ -142,7 +168,9 @@ public function __call($method, $parameters)
*/
public function __destruct()
{
if ($this->afterResponse) {
if (! $this->shouldDispatch()) {
// Do nothing.
} elseif ($this->afterResponse) {
app(Dispatcher::class)->dispatchAfterResponse($this->job);
} else {
app(Dispatcher::class)->dispatch($this->job);
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Foundation/Console/stubs/job.queued.stub
Expand Up @@ -4,6 +4,7 @@ namespace {{ namespace }};

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\UniqueJob;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
Expand Down
26 changes: 26 additions & 0 deletions src/Illuminate/Queue/CallQueuedHandler.php
Expand Up @@ -5,8 +5,10 @@
use Exception;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Queue\UniqueJob;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Pipeline\Pipeline;
use ReflectionClass;
Expand Down Expand Up @@ -59,6 +61,10 @@ public function call(Job $job, array $data)

$this->dispatchThroughMiddleware($job, $command);

if (! $job->isReleased()) {
$this->ensureUniqueJobLockIsReleased($command);
}

if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
$this->ensureSuccessfulBatchJobIsRecorded($command);
Expand Down Expand Up @@ -153,6 +159,25 @@ protected function ensureSuccessfulBatchJobIsRecorded($command)
$command->batch()->recordSuccessfulJob($command->job->uuid());
}

/**
* Ensure the lock for the unique job is released.
*
* @param mixed $command
* @return void
*/
protected function ensureUniqueJobLockIsReleased($command)
{
if ($command instanceof UniqueJob) {
$uniqueId = method_exists($command, 'uniqueId')
? $command->uniqueId()
: ($command->uniqueId ?? '');

$this->container->make(Cache::class)
->lock('unique:'.get_class($command).$uniqueId)
->forceRelease();
}
}

/**
* Handle a model not found exception.
*
Expand Down Expand Up @@ -192,6 +217,7 @@ public function failed(array $data, $e, string $uuid)
{
$command = unserialize($data['command']);

$this->ensureUniqueJobLockIsReleased($command);
$this->ensureFailedBatchJobIsRecorded($uuid, $command, $e);
$this->ensureChainCatchCallbacksAreInvoked($uuid, $command, $e);

Expand Down
6 changes: 3 additions & 3 deletions tests/Integration/Queue/RateLimitedTest.php
Expand Up @@ -88,7 +88,7 @@ protected function assertJobRanSuccessfully($class)
$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

Expand All @@ -108,7 +108,7 @@ protected function assertJobWasReleased($class)

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->once();
$job->shouldReceive('isReleased')->once()->andReturn(true);
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
Expand All @@ -126,7 +126,7 @@ protected function assertJobWasSkipped($class)
$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

Expand Down
6 changes: 3 additions & 3 deletions tests/Integration/Queue/RateLimitedWithRedisTest.php
Expand Up @@ -119,7 +119,7 @@ protected function assertJobRanSuccessfully($testJob)
$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

Expand All @@ -139,7 +139,7 @@ protected function assertJobWasReleased($testJob)

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->once();
$job->shouldReceive('isReleased')->once()->andReturn(true);
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
Expand All @@ -157,7 +157,7 @@ protected function assertJobWasSkipped($testJob)
$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

Expand Down
196 changes: 196 additions & 0 deletions tests/Integration/Queue/UniqueJobTest.php
@@ -0,0 +1,196 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\UniqueJob;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Bus;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class UniqueJobTest extends TestCase
{
protected function getEnvironmentSetUp($app)
{
$app['config']->set('database.default', 'testbench');
$app['config']->set('database.connections.testbench', [
'driver' => 'sqlite',
'database' => ':memory:',
'prefix' => '',
]);

$app['db']->connection()->getSchemaBuilder()->create('jobs', function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved_at']);
});
}

protected function tearDown(): void
{
$this->app['db']->connection()->getSchemaBuilder()->drop('jobs');

parent::tearDown();

m::close();
}

public function testUniqueJobsAreNotDispatched()
{
Bus::fake();
UniqueTestJob::dispatch();
Bus::assertDispatched(UniqueTestJob::class);

$this->assertFalse(
$this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get()
);

Bus::fake();
UniqueTestJob::dispatch();
Bus::assertNotDispatched(UniqueTestJob::class);

$this->assertFalse(
$this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get()
);
}

public function testLockIsReleasedForSuccessfulJobs()
{
UniqueTestJob::$handled = false;
dispatch($job = new UniqueTestJob);

$this->assertTrue($job::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}

public function testLockIsReleasedForFailedJobs()
{
UniqueTestFailJob::$handled = false;

$this->expectException(\Exception::class);

try {
dispatch($job = new UniqueTestFailJob);
} finally {
$this->assertTrue($job::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}
}

public function testLockIsNotReleasedForJobRetries()
{
UniqueTestRetryJob::$handled = false;
dispatch($job = new UniqueTestRetryJob);

$this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());

$this->artisan('queue:work', [
'connection' => 'database',
'--once' => true,
]);

$this->assertTrue($job::$handled);
$this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());

UniqueTestRetryJob::$handled = false;
$this->artisan('queue:work', [
'connection' => 'database',
'--once' => true,
]);

$this->assertTrue($job::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}

public function testLockIsNotReleasedForJobReleases()
{
UniqueTestReleasedJob::$handled = false;
dispatch($job = new UniqueTestReleasedJob);

$this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());

$this->artisan('queue:work', [
'connection' => 'database',
'--once' => true,
]);

$this->assertTrue($job::$handled);
$this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());

UniqueTestReleasedJob::$handled = false;
$this->artisan('queue:work', [
'connection' => 'database',
'--once' => true,
]);

$this->assertFalse($job::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}

protected function getLockKey($job)
{
return 'unique:'.(is_string($job) ? $job : get_class($job));
}
}

class UniqueTestJob implements ShouldQueue, UniqueJob
{
use InteractsWithQueue, Queueable, Dispatchable;

public static $handled = false;

public function handle()
{
static::$handled = true;
}
}

class UniqueTestFailJob implements ShouldQueue, UniqueJob
{
use InteractsWithQueue, Queueable, Dispatchable;

public $tries = 1;

public static $handled = false;

public function handle()
{
static::$handled = true;

throw new \Exception;
}
}

class UniqueTestReleasedJob extends UniqueTestFailJob
{
public $tries = 1;

public $connection = 'database';

public function handle()
{
static::$handled = true;

$this->release();
}
}

class UniqueTestRetryJob extends UniqueTestFailJob
{
public $tries = 2;

public $connection = 'database';
}