Skip to content

Commit

Permalink
feature #49725 [Messenger] Add support for the DelayStamp in InMemory…
Browse files Browse the repository at this point in the history
…Transport (fabpot)

This PR was merged into the 6.3 branch.

Discussion
----------

[Messenger] Add support for the DelayStamp in InMemoryTransport

| Q             | A
| ------------- | ---
| Branch?       | 6.3
| Bug fix?      | no
| New feature?  | yes <!-- please update src/**/CHANGELOG.md files -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       | n/a
| License       | MIT
| Doc PR        | n/a

Commits
-------

1025f35 [Messenger] Add support for the DelayStamp in InMemoryTransport
  • Loading branch information
fabpot committed Mar 19, 2023
2 parents 5fbf9be + 1025f35 commit 3276a3d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
Expand Down Expand Up @@ -84,6 +85,15 @@ public function testQueue()
$this->assertSame([], $this->transport->get());
}

public function testQueueWithDelay()
{
$envelope1 = new Envelope(new \stdClass());
$envelope1 = $this->transport->send($envelope1);
$envelope2 = (new Envelope(new \stdClass()))->with(new DelayStamp(10_000));
$envelope2 = $this->transport->send($envelope2);
$this->assertSame([$envelope1], $this->transport->get());
}

public function testQueueWithSerialization()
{
$envelope = new Envelope(new \stdClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

namespace Symfony\Component\Messenger\Transport\InMemory;

use Psr\Clock\ClockInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand Down Expand Up @@ -46,16 +48,25 @@ class InMemoryTransport implements TransportInterface, ResetInterface
private array $queue = [];

private int $nextId = 1;
private ?SerializerInterface $serializer;
private array $availableAt = [];

public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer;
public function __construct(
private ?SerializerInterface $serializer = null,
private ?ClockInterface $clock = null,
) {
}

public function get(): iterable
{
return array_values($this->decode($this->queue));
$envelopes = [];
$now = $this->clock?->now() ?? new \DateTimeImmutable();
foreach ($this->decode($this->queue) as $id => $envelope) {
if (!isset($this->availableAt[$id]) || $now > $this->availableAt[$id]) {
$envelopes[] = $envelope;
}
}

return $envelopes;
}

public function ack(Envelope $envelope): void
Expand All @@ -66,7 +77,7 @@ public function ack(Envelope $envelope): void
throw new LogicException('No TransportMessageIdStamp found on the Envelope.');
}

unset($this->queue[$transportMessageIdStamp->getId()]);
unset($this->queue[$id = $transportMessageIdStamp->getId()], $this->availableAt[$id]);
}

public function reject(Envelope $envelope): void
Expand All @@ -77,7 +88,7 @@ public function reject(Envelope $envelope): void
throw new LogicException('No TransportMessageIdStamp found on the Envelope.');
}

unset($this->queue[$transportMessageIdStamp->getId()]);
unset($this->queue[$id = $transportMessageIdStamp->getId()], $this->availableAt[$id]);
}

public function send(Envelope $envelope): Envelope
Expand All @@ -88,6 +99,12 @@ public function send(Envelope $envelope): Envelope
$this->sent[] = $encodedEnvelope;
$this->queue[$id] = $encodedEnvelope;

/** @var DelayStamp|null $delayStamp */
if ($delayStamp = $envelope->last(DelayStamp::class)) {
$now = $this->clock?->now() ?? new \DateTimeImmutable();
$this->availableAt[$id] = $now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000));
}

return $envelope;
}

Expand Down

0 comments on commit 3276a3d

Please sign in to comment.