Skip to content

Commit

Permalink
Add Redis rate limiting job middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
paras-malhotra committed Oct 14, 2020
1 parent 7385ddd commit de94375
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 0 deletions.
90 changes: 90 additions & 0 deletions src/Illuminate/Queue/Middleware/RateLimitsJobsWithRedis.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Redis\Limiters\DurationLimiter;
use Illuminate\Support\InteractsWithTime;

class RateLimitsJobsWithRedis extends RateLimitsJobs
{
use InteractsWithTime;

/**
* The Redis factory implementation.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;

/**
* The timestamp of the end of the current duration by key.
*
* @var array
*/
public $decaysAt = [];

/**
* Create a new rate limiter middleware instance.
*
* @param string $limiterName
*
* @return void
*/
public function __construct($limiterName)
{
parent::__construct($limiterName);

$this->redis = Container::getInstance()->make(Redis::class);
}

/**
* Handle a rate limited job.
*
* @param mixed $job
* @param callable $next
* @param array $limits
* @return mixed
*/
protected function handleJob($job, $next, array $limits)
{
foreach ($limits as $limit) {
if ($this->tooManyAttempts($limit->key, $limit->maxAttempts, $limit->decayMinutes)) {
return $job->release($this->getTimeUntilNextRetry($limit->key));
}
}

return $next($job);
}

/**
* Determine if the given key has been "accessed" too many times.
*
* @param string $key
* @param int $maxAttempts
* @param int $decayMinutes
* @return bool
*/
protected function tooManyAttempts($key, $maxAttempts, $decayMinutes)
{
$limiter = new DurationLimiter(
$this->redis, $key, $maxAttempts, $decayMinutes * 60
);

return tap(! $limiter->acquire(), function () use ($key, $limiter) {
$this->decaysAt[$key] = $limiter->decaysAt;
});
}

/**
* Get the number of seconds until the lock is released.
*
* @param string $key
* @return int
*/
protected function getTimeUntilNextRetry($key)
{
return $this->decaysAt[$key] - $this->currentTime();
}
}
176 changes: 176 additions & 0 deletions tests/Integration/Queue/RateLimitsJobsWithRedisTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Cache\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\RateLimitsJobsWithRedis;
use Illuminate\Support\Str;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class RateLimitsJobsWithRedisTest extends TestCase
{
use InteractsWithRedis;

protected function setUp(): void
{
parent::setUp();

$this->setUpRedis();
}

protected function tearDown(): void
{
parent::tearDown();

$this->tearDownRedis();

m::close();
}

public function testUnlimitedJobsAreExecuted()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$testJob = new RateLimitedTestJob;

$rateLimiter->for($testJob->key, function ($job) {
return Limit::none();
});

$this->assertJobRanSuccessfully($testJob);
$this->assertJobRanSuccessfully($testJob);
}

public function testRateLimitedJobsAreNotExecutedOnLimitReached()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$testJob = new RateLimitedTestJob;

$rateLimiter->for($testJob->key, function ($job) {
return Limit::perMinute(1);
});

$this->assertJobRanSuccessfully($testJob);
$this->assertJobWasReleased($testJob);
}

public function testJobsCanHaveConditionalRateLimits()
{
$rateLimiter = $this->app->make(RateLimiter::class);

$adminJob = new AdminTestJob;

$rateLimiter->for($adminJob->key, function ($job) {
if ($job->isAdmin()) {
return Limit::none();
}

return Limit::perMinute(1);
});

$this->assertJobRanSuccessfully($adminJob);
$this->assertJobRanSuccessfully($adminJob);

$nonAdminJob = new NonAdminTestJob;

$rateLimiter->for($nonAdminJob->key, function ($job) {
if ($job->isAdmin()) {
return Limit::none();
}

return Limit::perMinute(1);
});

$this->assertJobRanSuccessfully($nonAdminJob);
$this->assertJobWasReleased($nonAdminJob);
}

protected function assertJobRanSuccessfully($testJob)
{
$testJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->once()->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($testJob),
]);

$this->assertTrue($testJob::$handled);
}

protected function assertJobWasReleased($testJob)
{
$testJob::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->once();
$job->shouldReceive('isReleased')->once()->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($testJob),
]);

$this->assertFalse($testJob::$handled);
}
}

class RateLimitedTestJob
{
use InteractsWithQueue, Queueable;

public $key;

public static $handled = false;

public function __construct() {
$this->key = Str::random(10);
}

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new RateLimitsJobsWithRedis($this->key)];
}
}

class AdminTestJob extends RateLimitedTestJob
{
public function isAdmin()
{
return true;
}
}

class NonAdminTestJob extends RateLimitedTestJob
{
public function isAdmin()
{
return false;
}
}

0 comments on commit de94375

Please sign in to comment.