Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Add job middleware to prevent overlapping jobs #34794

Merged
merged 9 commits into from Oct 14, 2020
136 changes: 136 additions & 0 deletions src/Illuminate/Queue/Middleware/PreventOverlappingJobs.php
@@ -0,0 +1,136 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Contracts\Cache\Repository as Cache;

class PreventOverlappingJobs
{
/**
* The amount of time (in seconds) to expire the lock.
*
* @var int
*/
public $expiresAt;

/**
* The key of the job.
*
* @var string
*/
public $key;
paras-malhotra marked this conversation as resolved.
Show resolved Hide resolved

/**
* The prefix of the lock key.
*
* @var string
*/
public $prefix = 'overlap:';

/**
* The delay (in seconds) to release the job back to the queue.
*
* @var int|null
*/
public $releaseAfter;

/**
* Create a new overlapping jobs middleware instance.
*
* @param string $key
* @param int|null $releaseAfter
paras-malhotra marked this conversation as resolved.
Show resolved Hide resolved
* @param int $expiresAt
*
* @return void
*/
public function __construct($key = '', $releaseAfter = 0, $expiresAt = 0)
{
$this->key = $key;
$this->releaseAfter = $releaseAfter;
$this->expiresAt = $expiresAt;
}

/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
$lock = app(Cache::class)->lock($this->getLockKey($job), $this->expiresAt);

if ($lock->get()) {
try {
$next($job);
} finally {
$lock->release();
}
} elseif (! is_null($this->releaseAfter)) {
$job->release($this->releaseAfter);
}
}

/**
* Do not release the job back to the queue.
*
* @return $this
*/
public function dontRelease()
{
$this->releaseAfter = null;

return $this;
}

/**
* Set the expiry (in seconds) of the lock key.
*
* @param int $expiresAt
* @return $this
*/
public function expireAt($expiresAt)
{
$this->expiresAt = $expiresAt;

return $this;
}

/**
* Set the delay (in seconds) to release the job back to the queue.
*
* @param int $releaseAfter
* @return $this
*/
public function releaseAfter($releaseAfter)
{
$this->releaseAfter = $releaseAfter;

return $this;
}

/**
* Set the prefix of the lock key.
*
* @param string $prefix
* @return $this
*/
public function withPrefix($prefix)
{
$this->prefix = $prefix;

return $this;
}

/**
* Get the lock key.
*
* @param mixed $job
paras-malhotra marked this conversation as resolved.
Show resolved Hide resolved
* @return string
*/
public function getLockKey($job)
{
return $this->prefix.get_class($job).':'.$this->key;
}
}
152 changes: 152 additions & 0 deletions tests/Integration/Queue/PreventOverlappingJobsTest.php
@@ -0,0 +1,152 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\PreventOverlappingJobs;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class PreventOverlappingJobsTest extends TestCase
{
protected function tearDown(): void
{
parent::tearDown();

m::close();
}

public function testNonOverlappingJobsAreExecuted()
{
OverlappingTestJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command = new OverlappingTestJob),
]);

$lockKey = (new PreventOverlappingJobs)->getLockKey($command);

$this->assertTrue(OverlappingTestJob::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($lockKey, 10)->acquire());
}

public function testLockIsReleasedOnJobExceptions()
{
FailedOverlappingTestJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->andReturn(false);

$this->expectException(\Exception::class);

try {
$instance->call($job, [
'command' => serialize($command = new FailedOverlappingTestJob),
]);
} finally {
$lockKey = (new PreventOverlappingJobs)->getLockKey($command);

$this->assertTrue(FailedOverlappingTestJob::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($lockKey, 10)->acquire());
}
}

public function testOverlappingJobsAreReleased()
{
OverlappingTestJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$lockKey = (new PreventOverlappingJobs)->getLockKey($command = new OverlappingTestJob);
$this->app->get(Cache::class)->lock($lockKey, 10)->acquire();

$job = m::mock(Job::class);

$job->shouldReceive('release')->once();
$job->shouldReceive('hasFailed')->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->andReturn(true);

$instance->call($job, [
'command' => serialize($command),
]);

$this->assertFalse(OverlappingTestJob::$handled);
}

public function testOverlappingJobsCanBeSkipped()
{
SkipOverlappingTestJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$lockKey = (new PreventOverlappingJobs)->getLockKey($command = new SkipOverlappingTestJob);
$this->app->get(Cache::class)->lock($lockKey, 10)->acquire();

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command),
]);

$this->assertFalse(SkipOverlappingTestJob::$handled);
}
}

class OverlappingTestJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new PreventOverlappingJobs];
}
}

class SkipOverlappingTestJob extends OverlappingTestJob
{
public function middleware()
{
return [(new PreventOverlappingJobs)->dontRelease()];
}
}

class FailedOverlappingTestJob extends OverlappingTestJob
{
public function handle()
{
static::$handled = true;

throw new \Exception;
}
}