Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Bring Rate Limiters to Jobs #34829

Merged
merged 6 commits into from Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/Illuminate/Queue/Middleware/RateLimitsJobs.php
@@ -0,0 +1,102 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Cache\RateLimiter;
use Illuminate\Cache\RateLimiting\Unlimited;
use Illuminate\Container\Container;
use Illuminate\Support\Arr;

class RateLimitsJobs
{
/**
* The rate limiter instance.
*
* @var \Illuminate\Cache\RateLimiter
*/
protected $limiter;

/**
* The name of the rate limiter.
*
* @var string
*/
protected $limiterName;

/**
* Create a new rate limiter middleware instance.
*
* @param string $limiterName
*
* @return void
*/
public function __construct($limiterName)
{
$this->limiter = Container::getInstance()->make(RateLimiter::class);
$this->limiterName = $limiterName;
}

/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
if (! is_null($limiter = $this->limiter->limiter($this->limiterName))) {
$limiterResponse = call_user_func($limiter, $job);

if ($limiterResponse instanceof Unlimited) {
return $next($job);
}

return $this->handleJob(
$job,
$next,
collect(Arr::wrap($limiterResponse))->map(function ($limit) {
return (object) [
'key' => md5($this->limiterName.$limit->key),
'maxAttempts' => $limit->maxAttempts,
'decayMinutes' => $limit->decayMinutes,
];
})->all()
);
} else {
return $next($job);
}
}

/**
* Handle a rate limited job.
*
* @param mixed $job
* @param callable $next
* @param array $limits
* @return mixed
*/
protected function handleJob($job, $next, array $limits)
{
foreach ($limits as $limit) {
if ($this->limiter->tooManyAttempts($limit->key, $limit->maxAttempts)) {
return $job->release($this->getTimeUntilNextRetry($limit->key));
}

$this->limiter->hit($limit->key, $limit->decayMinutes * 60);
}

return $next($job);
}

/**
* Get the number of seconds until the next retry.
*
* @param string $key
* @return int
*/
protected function getTimeUntilNextRetry($key)
{
return $this->limiter->availableIn($key);
}
}
90 changes: 90 additions & 0 deletions src/Illuminate/Queue/Middleware/RateLimitsJobsWithRedis.php
@@ -0,0 +1,90 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Redis\Limiters\DurationLimiter;
use Illuminate\Support\InteractsWithTime;

class RateLimitsJobsWithRedis extends RateLimitsJobs
{
use InteractsWithTime;

/**
* The Redis factory implementation.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;

/**
* The timestamp of the end of the current duration by key.
*
* @var array
*/
public $decaysAt = [];

/**
* Create a new rate limiter middleware instance.
*
* @param string $limiterName
*
* @return void
*/
public function __construct($limiterName)
{
parent::__construct($limiterName);

$this->redis = Container::getInstance()->make(Redis::class);
}

/**
* Handle a rate limited job.
*
* @param mixed $job
* @param callable $next
* @param array $limits
* @return mixed
*/
protected function handleJob($job, $next, array $limits)
{
foreach ($limits as $limit) {
if ($this->tooManyAttempts($limit->key, $limit->maxAttempts, $limit->decayMinutes)) {
return $job->release($this->getTimeUntilNextRetry($limit->key));
}
}

return $next($job);
}

/**
* Determine if the given key has been "accessed" too many times.
*
* @param string $key
* @param int $maxAttempts
* @param int $decayMinutes
* @return bool
*/
protected function tooManyAttempts($key, $maxAttempts, $decayMinutes)
{
$limiter = new DurationLimiter(
$this->redis, $key, $maxAttempts, $decayMinutes * 60
);

return tap(! $limiter->acquire(), function () use ($key, $limiter) {
$this->decaysAt[$key] = $limiter->decaysAt;
});
}

/**
* Get the number of seconds until the lock is released.
*
* @param string $key
* @return int
*/
protected function getTimeUntilNextRetry($key)
{
return $this->decaysAt[$key] - $this->currentTime();
}
}
141 changes: 141 additions & 0 deletions tests/Integration/Queue/RateLimitsJobsTest.php
@@ -0,0 +1,141 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Cache\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\RateLimitsJobs;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class RateLimitsJobsTest extends TestCase
{
protected function tearDown(): void
{
parent::tearDown();

m::close();
}

public function testUnlimitedJobsAreExecuted()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$rateLimiter->for('test', function ($job) {
return Limit::none();
});

$this->assertJobRanSuccessfully(RateLimitedTestJob::class);
$this->assertJobRanSuccessfully(RateLimitedTestJob::class);
}

public function testRateLimitedJobsAreNotExecutedOnLimitReached()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$rateLimiter->for('test', function ($job) {
return Limit::perHour(1);
});

$this->assertJobRanSuccessfully(RateLimitedTestJob::class);
$this->assertJobWasReleased(RateLimitedTestJob::class);
}

public function testJobsCanHaveConditionalRateLimits()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$rateLimiter->for('test', function ($job) {
if ($job->isAdmin()) {
return Limit::none();
}

return Limit::perHour(1);
});

$this->assertJobRanSuccessfully(AdminTestJob::class);
$this->assertJobRanSuccessfully(AdminTestJob::class);

$this->assertJobRanSuccessfully(NonAdminTestJob::class);
$this->assertJobWasReleased(NonAdminTestJob::class);
}

protected function assertJobRanSuccessfully($class)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command = new $class),
]);

$this->assertTrue($class::$handled);
}

protected function assertJobWasReleased($class)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->once();
$job->shouldReceive('isReleased')->once()->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class),
]);

$this->assertFalse($class::$handled);
}
}

class RateLimitedTestJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new RateLimitsJobs('test')];
}
}

class AdminTestJob extends RateLimitedTestJob
{
public function isAdmin()
{
return true;
}
}

class NonAdminTestJob extends RateLimitedTestJob
{
public function isAdmin()
{
return false;
}
}