Skip to content

Commit

Permalink
fix #49890: ShouldBeUnique behavior for missing models
Browse files Browse the repository at this point in the history
  • Loading branch information
naquad committed Feb 23, 2024
1 parent e0be50a commit 15e2d15
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 20 deletions.
6 changes: 5 additions & 1 deletion src/Illuminate/Bus/UniqueLock.php
Expand Up @@ -70,6 +70,10 @@ protected function getKey($job)
? $job->uniqueId()
: ($job->uniqueId ?? '');

return 'laravel_unique_job:'.get_class($job).$uniqueId;
$jobName = property_exists($job, 'jobName')
? $job->jobName
: get_class($job);

return 'laravel_unique_job:'.$jobName.$uniqueId;
}
}
66 changes: 50 additions & 16 deletions src/Illuminate/Queue/CallQueuedHandler.php
Expand Up @@ -4,13 +4,10 @@

use Exception;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\UniqueLock;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Pipeline\Pipeline;
Expand Down Expand Up @@ -60,17 +57,19 @@ public function call(Job $job, array $data)
$job, $this->getCommand($data)
);
} catch (ModelNotFoundException $e) {
$this->ensureUniqueJobLockIsReleased($data);

return $this->handleModelNotFound($job, $e);
}

if ($command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
$this->ensureUniqueJobLockIsReleased($data);
}

$this->dispatchThroughMiddleware($job, $command);

if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
$this->ensureUniqueJobLockIsReleased($data);
}

if (! $job->hasFailed() && ! $job->isReleased()) {
Expand All @@ -83,6 +82,32 @@ public function call(Job $job, array $data)
}
}

/**
* Get the unserialized object from the given payload.
*
* @param string $key
* @param array $data
* @return mixed
*/
protected function getUnserializedItem(string $key, array $data)
{
if (isset($data[$key])) {
if (
str_starts_with($data[$key], 'O:') ||
$data[$key] == 'N;'
) {
return unserialize($data[$key]);
}

if ($this->container->bound(Encrypter::class)) {
return unserialize($this->container[Encrypter::class]
->decrypt($data[$key]));
}
}

return null;
}

/**
* Get the command from the given payload.
*
Expand All @@ -93,17 +118,25 @@ public function call(Job $job, array $data)
*/
protected function getCommand(array $data)
{
if (str_starts_with($data['command'], 'O:')) {
return unserialize($data['command']);
}

if ($this->container->bound(Encrypter::class)) {
return unserialize($this->container[Encrypter::class]->decrypt($data['command']));
$command = $this->getUnserializedItem('command', $data);
if ($command !== null) {
return $command;
}

throw new RuntimeException('Unable to extract job payload.');
}

/**
* Get the unique handler from the given payload.
*
* @param array $data
* @return \Illuminate\Queue\UniqueHandler|null
*/
protected function getUniqueHandler(array $data)
{
return $this->getUnserializedItem('uniqueHandler', $data);
}

/**
* Dispatch the given job / command through its specified middleware.
*
Expand Down Expand Up @@ -196,13 +229,14 @@ protected function ensureSuccessfulBatchJobIsRecorded($command)
/**
* Ensure the lock for a unique job is released.
*
* @param mixed $command
* @param array $data
* @return void
*/
protected function ensureUniqueJobLockIsReleased($command)
protected function ensureUniqueJobLockIsReleased($data)
{
if ($command instanceof ShouldBeUnique) {
(new UniqueLock($this->container->make(Cache::class)))->release($command);
$handler = $this->getUniqueHandler($data);
if ($handler !== null) {
$handler->withContainer($this->container)->release();
}
}

Expand Down Expand Up @@ -246,7 +280,7 @@ public function failed(array $data, $e, string $uuid)
$command = $this->getCommand($data);

if (! $command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
$this->ensureUniqueJobLockIsReleased($data);
}

if ($command instanceof \__PHP_Incomplete_Class) {
Expand Down
18 changes: 15 additions & 3 deletions src/Illuminate/Queue/Queue.php
Expand Up @@ -139,6 +139,8 @@ protected function createPayloadArray($job, $queue, $data = '')
*/
protected function createObjectPayload($job, $queue)
{
$handler = UniqueHandler::forJob($job);

$payload = $this->withCreatePayloadHooks($queue, [
'uuid' => (string) Str::uuid(),
'displayName' => $this->getDisplayName($job),
Expand All @@ -150,17 +152,27 @@ protected function createObjectPayload($job, $queue)
'timeout' => $job->timeout ?? null,
'retryUntil' => $this->getJobExpiration($job),
'data' => [
'uniqueHandler' => $handler,
'commandName' => $job,
'command' => $job,
],
]);

$command = $this->jobShouldBeEncrypted($job) && $this->container->bound(Encrypter::class)
? $this->container[Encrypter::class]->encrypt(serialize(clone $job))
: serialize(clone $job);
$handler = serialize($handler);
$command = serialize($job);

if (
$this->jobShouldBeEncrypted($job) &&
$this->container->bound(Encrypter::class)
) {
$encrypter = $this->container[Encrypter::class];
$handler = $encrypter->encrypt($handler);
$command = $encrypter->encrypt($command);
}

return array_merge($payload, [
'data' => array_merge($payload['data'], [
'uniqueHandler' => $handler,
'commandName' => get_class($job),
'command' => $command,
]),
Expand Down
116 changes: 116 additions & 0 deletions src/Illuminate/Queue/UniqueHandler.php
@@ -0,0 +1,116 @@
<?php

namespace Illuminate\Queue;

use Illuminate\Bus\UniqueLock;
use Illuminate\Contracts\Cache\Factory as CacheFactory;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

/**
* A helper class to manage the unique ID and cache instance for a job
* base on the data of the job itself.
*/
class UniqueHandler
{
/**
* Original job name.
*
* @var string
*/
public $jobName;

/**
* The unique ID for the job.
*
* @var string|null
*/
public $uniqueId = null;

/**
* Cache connection name for the job.
*
* @var string|null
*/
protected $uniqueVia = null;

/**
* The container instance.
*
* @var \Illuminate\Contracts\Container\Container
*/
protected $container;

/**
* Create a new handler instance.
*
* @param object $job
*/
public function __construct(object $job)
{
$this->jobName = get_class($job);

if (method_exists($job, 'uniqueId')) {
$this->uniqueId = $job->uniqueId();
} elseif (isset($job->uniqueId)) {
$this->uniqueId = $job->uniqueId;
}

if (method_exists($job, 'uniqueVia')) {
$this->uniqueVia = $job->uniqueVia()->getName();
}
}

/**
* Creates a new instance if the job should be unique.
*
* @param object $job
* @return \Illuminate\Queue\UniqueHandler|null
*/
public static function forJob(object $job)
{
if (
$job instanceof ShouldBeUnique ||
$job instanceof ShouldBeUniqueUntilProcessing
) {
return new static($job);
}

return null;
}

/**
* Sets the container instance.
*
* @param \Illuminate\Contracts\Container\Container $container
* @return \Illuminate\Queue\UpdateHandler
*/
public function withContainer(Container $container)
{
$this->container = $container;

return $this;
}

/**
* Returns the cache instance for the job.
*
* @return \Illuminate\Contracts\Cache\Repository
*/
protected function getCacheStore()
{
return $this->container->make(CacheFactory::class)
->store($this->uniqueVia);
}

/**
* Releases the lock for the job.
*
* @return void
*/
public function release()
{
(new UniqueLock($this->getCacheStore()))->release($this);
}
}
27 changes: 27 additions & 0 deletions tests/Integration/Queue/UniqueJobTest.php
Expand Up @@ -8,6 +8,7 @@
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Bus;
Expand Down Expand Up @@ -129,6 +130,22 @@ public function testLockCanBeReleasedBeforeProcessing()
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}

public function testLockIsReleasedForJobsWithMissingModels()
{
$this->markTestSkippedWhenUsingSyncQueueDriver();

UniqueUntilStartTestJob::$handled = false;

dispatch($job = new UniqueWithModelMissing);

$this->assertFalse($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());

$this->runQueueWorkerCommand(['--once' => true]);

$this->assertFalse($job::$handled);
$this->assertTrue($this->app->get(Cache::class)->lock($this->getLockKey($job), 10)->get());
}

protected function getLockKey($job)
{
return 'laravel_unique_job:'.(is_string($job) ? $job : get_class($job));
Expand Down Expand Up @@ -184,3 +201,13 @@ class UniqueUntilStartTestJob extends UniqueTestJob implements ShouldBeUniqueUnt
{
public $tries = 2;
}

class UniqueWithModelMissing extends UniqueTestJob implements ShouldQueue, ShouldBeUnique
{
public $deleteWhenMissingModels = true;

public function __wakeup()
{
throw new ModelNotFoundException('test error');
}
}

0 comments on commit 15e2d15

Please sign in to comment.