diff --git a/src/Illuminate/Queue/Middleware/RateLimitsJobs.php b/src/Illuminate/Queue/Middleware/RateLimitsJobs.php new file mode 100644 index 000000000000..46b512c0e5ef --- /dev/null +++ b/src/Illuminate/Queue/Middleware/RateLimitsJobs.php @@ -0,0 +1,102 @@ +limiter = Container::getInstance()->make(RateLimiter::class); + $this->limiterName = $limiterName; + } + + /** + * Process the job. + * + * @param mixed $job + * @param callable $next + * @return mixed + */ + public function handle($job, $next) + { + if (! is_null($limiter = $this->limiter->limiter($this->limiterName))) { + $limiterResponse = call_user_func($limiter, $job); + + if ($limiterResponse instanceof Unlimited) { + return $next($job); + } + + return $this->handleJob( + $job, + $next, + collect(Arr::wrap($limiterResponse))->map(function ($limit) { + return (object) [ + 'key' => md5($this->limiterName.$limit->key), + 'maxAttempts' => $limit->maxAttempts, + 'decayMinutes' => $limit->decayMinutes, + ]; + })->all() + ); + } else { + return $next($job); + } + } + + /** + * Handle a rate limited job. + * + * @param mixed $job + * @param callable $next + * @param array $limits + * @return mixed + */ + protected function handleJob($job, $next, array $limits) + { + foreach ($limits as $limit) { + if ($this->limiter->tooManyAttempts($limit->key, $limit->maxAttempts)) { + return $job->release($this->getTimeUntilNextRetry($limit->key)); + } + + $this->limiter->hit($limit->key, $limit->decayMinutes * 60); + } + + return $next($job); + } + + /** + * Get the number of seconds until the next retry. + * + * @param string $key + * @return int + */ + protected function getTimeUntilNextRetry($key) + { + return $this->limiter->availableIn($key); + } +} diff --git a/src/Illuminate/Queue/Middleware/RateLimitsJobsWithRedis.php b/src/Illuminate/Queue/Middleware/RateLimitsJobsWithRedis.php new file mode 100644 index 000000000000..0409d5cd9995 --- /dev/null +++ b/src/Illuminate/Queue/Middleware/RateLimitsJobsWithRedis.php @@ -0,0 +1,90 @@ +redis = Container::getInstance()->make(Redis::class); + } + + /** + * Handle a rate limited job. + * + * @param mixed $job + * @param callable $next + * @param array $limits + * @return mixed + */ + protected function handleJob($job, $next, array $limits) + { + foreach ($limits as $limit) { + if ($this->tooManyAttempts($limit->key, $limit->maxAttempts, $limit->decayMinutes)) { + return $job->release($this->getTimeUntilNextRetry($limit->key)); + } + } + + return $next($job); + } + + /** + * Determine if the given key has been "accessed" too many times. + * + * @param string $key + * @param int $maxAttempts + * @param int $decayMinutes + * @return bool + */ + protected function tooManyAttempts($key, $maxAttempts, $decayMinutes) + { + $limiter = new DurationLimiter( + $this->redis, $key, $maxAttempts, $decayMinutes * 60 + ); + + return tap(! $limiter->acquire(), function () use ($key, $limiter) { + $this->decaysAt[$key] = $limiter->decaysAt; + }); + } + + /** + * Get the number of seconds until the lock is released. + * + * @param string $key + * @return int + */ + protected function getTimeUntilNextRetry($key) + { + return $this->decaysAt[$key] - $this->currentTime(); + } +} diff --git a/tests/Integration/Queue/RateLimitsJobsTest.php b/tests/Integration/Queue/RateLimitsJobsTest.php new file mode 100644 index 000000000000..1922df3b8f59 --- /dev/null +++ b/tests/Integration/Queue/RateLimitsJobsTest.php @@ -0,0 +1,141 @@ +app->make(RateLimiter::class); + + $rateLimiter->for('test', function ($job) { + return Limit::none(); + }); + + $this->assertJobRanSuccessfully(RateLimitedTestJob::class); + $this->assertJobRanSuccessfully(RateLimitedTestJob::class); + } + + public function testRateLimitedJobsAreNotExecutedOnLimitReached() + { + $rateLimiter = $this->app->make(RateLimiter::class); + + $rateLimiter->for('test', function ($job) { + return Limit::perHour(1); + }); + + $this->assertJobRanSuccessfully(RateLimitedTestJob::class); + $this->assertJobWasReleased(RateLimitedTestJob::class); + } + + public function testJobsCanHaveConditionalRateLimits() + { + $rateLimiter = $this->app->make(RateLimiter::class); + + $rateLimiter->for('test', function ($job) { + if ($job->isAdmin()) { + return Limit::none(); + } + + return Limit::perHour(1); + }); + + $this->assertJobRanSuccessfully(AdminTestJob::class); + $this->assertJobRanSuccessfully(AdminTestJob::class); + + $this->assertJobRanSuccessfully(NonAdminTestJob::class); + $this->assertJobWasReleased(NonAdminTestJob::class); + } + + protected function assertJobRanSuccessfully($class) + { + $class::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); + $job->shouldReceive('delete')->once(); + + $instance->call($job, [ + 'command' => serialize($command = new $class), + ]); + + $this->assertTrue($class::$handled); + } + + protected function assertJobWasReleased($class) + { + $class::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('release')->once(); + $job->shouldReceive('isReleased')->once()->andReturn(true); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); + + $instance->call($job, [ + 'command' => serialize($command = new $class), + ]); + + $this->assertFalse($class::$handled); + } +} + +class RateLimitedTestJob +{ + use InteractsWithQueue, Queueable; + + public static $handled = false; + + public function handle() + { + static::$handled = true; + } + + public function middleware() + { + return [new RateLimitsJobs('test')]; + } +} + +class AdminTestJob extends RateLimitedTestJob +{ + public function isAdmin() + { + return true; + } +} + +class NonAdminTestJob extends RateLimitedTestJob +{ + public function isAdmin() + { + return false; + } +} diff --git a/tests/Integration/Queue/RateLimitsJobsWithRedisTest.php b/tests/Integration/Queue/RateLimitsJobsWithRedisTest.php new file mode 100644 index 000000000000..e3eb8a939394 --- /dev/null +++ b/tests/Integration/Queue/RateLimitsJobsWithRedisTest.php @@ -0,0 +1,177 @@ +setUpRedis(); + } + + protected function tearDown(): void + { + parent::tearDown(); + + $this->tearDownRedis(); + + m::close(); + } + + public function testUnlimitedJobsAreExecuted() + { + $rateLimiter = $this->app->make(RateLimiter::class); + + $testJob = new RedisRateLimitedTestJob; + + $rateLimiter->for($testJob->key, function ($job) { + return Limit::none(); + }); + + $this->assertJobRanSuccessfully($testJob); + $this->assertJobRanSuccessfully($testJob); + } + + public function testRateLimitedJobsAreNotExecutedOnLimitReached() + { + $rateLimiter = $this->app->make(RateLimiter::class); + + $testJob = new RedisRateLimitedTestJob; + + $rateLimiter->for($testJob->key, function ($job) { + return Limit::perMinute(1); + }); + + $this->assertJobRanSuccessfully($testJob); + $this->assertJobWasReleased($testJob); + } + + public function testJobsCanHaveConditionalRateLimits() + { + $rateLimiter = $this->app->make(RateLimiter::class); + + $adminJob = new RedisAdminTestJob; + + $rateLimiter->for($adminJob->key, function ($job) { + if ($job->isAdmin()) { + return Limit::none(); + } + + return Limit::perMinute(1); + }); + + $this->assertJobRanSuccessfully($adminJob); + $this->assertJobRanSuccessfully($adminJob); + + $nonAdminJob = new RedisNonAdminTestJob; + + $rateLimiter->for($nonAdminJob->key, function ($job) { + if ($job->isAdmin()) { + return Limit::none(); + } + + return Limit::perMinute(1); + }); + + $this->assertJobRanSuccessfully($nonAdminJob); + $this->assertJobWasReleased($nonAdminJob); + } + + protected function assertJobRanSuccessfully($testJob) + { + $testJob::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('isReleased')->once()->andReturn(false); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false); + $job->shouldReceive('delete')->once(); + + $instance->call($job, [ + 'command' => serialize($testJob), + ]); + + $this->assertTrue($testJob::$handled); + } + + protected function assertJobWasReleased($testJob) + { + $testJob::$handled = false; + $instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app); + + $job = m::mock(Job::class); + + $job->shouldReceive('hasFailed')->once()->andReturn(false); + $job->shouldReceive('release')->once(); + $job->shouldReceive('isReleased')->once()->andReturn(true); + $job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true); + + $instance->call($job, [ + 'command' => serialize($testJob), + ]); + + $this->assertFalse($testJob::$handled); + } +} + +class RedisRateLimitedTestJob +{ + use InteractsWithQueue, Queueable; + + public $key; + + public static $handled = false; + + public function __construct() + { + $this->key = Str::random(10); + } + + public function handle() + { + static::$handled = true; + } + + public function middleware() + { + return [new RateLimitsJobsWithRedis($this->key)]; + } +} + +class RedisAdminTestJob extends RedisRateLimitedTestJob +{ + public function isAdmin() + { + return true; + } +} + +class RedisNonAdminTestJob extends RedisRateLimitedTestJob +{ + public function isAdmin() + { + return false; + } +}