From ad6f8532c69594954a3332120555781037b3f8fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Deruss=C3=A9?= Date: Wed, 13 May 2020 19:00:08 +0200 Subject: [PATCH] [Messenger] Do not stack retry stamp --- .../SendFailedMessageForRetryListener.php | 31 ++++++++++++++-- .../SendFailedMessageForRetryListenerTest.php | 36 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php index 5e654b51d4ba..7a4e3c1563d9 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php @@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; /** @@ -31,12 +32,14 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface private $sendersLocator; private $retryStrategyLocator; private $logger; + private $historySize; - public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null) + public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10) { $this->sendersLocator = $sendersLocator; $this->retryStrategyLocator = $retryStrategyLocator; $this->logger = $logger; + $this->historySize = $historySize; } public function onMessageFailed(WorkerMessageFailedEvent $event) @@ -64,7 +67,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) } // add the delay and retry stamp info - $retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount)); + $retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount)); // re-send the message for retry $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope); @@ -75,6 +78,30 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) } } + /** + * Adds stamps to the envelope by keeping only the First + Last N stamps + */ + private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope + { + foreach ($stamps as $stamp) { + $history = $envelope->all(get_class($stamp)); + if (\count($history) < $this->historySize) { + $envelope = $envelope->with($stamp); + continue; + } + + $history = \array_merge( + [$history[0]], + \array_slice($history, -$this->historySize + 2), + [$stamp] + ); + + $envelope = $envelope->withoutAll(get_class($stamp))->with(...$history); + } + + return $envelope; + } + public static function getSubscribedEvents() { return [ diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php index 7008b48a0950..20aae261ca8f 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php @@ -76,4 +76,40 @@ public function testEnvelopeIsSentToTransportOnRetry() $listener->onMessageFailed($event); } + + public function testEnvelopeKeepOnlyTheLast10Stamps() + { + $exception = new \Exception('no!'); + $stamps = \array_merge( + \array_fill(0, 15, new DelayStamp(1)), + \array_fill(0, 3, new RedeliveryStamp(1)) + ); + $envelope = new Envelope(new \stdClass(), $stamps); + + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) { + $delayStamps = $envelope->all(DelayStamp::class); + $redeliveryStamps = $envelope->all(RedeliveryStamp::class); + + $this->assertCount(10, $delayStamps); + $this->assertCount(4, $redeliveryStamps); + + return $envelope; + }); + $senderLocator = $this->createMock(ContainerInterface::class); + $senderLocator->expects($this->once())->method('has')->willReturn(true); + $senderLocator->expects($this->once())->method('get')->willReturn($sender); + $retryStategy = $this->createMock(RetryStrategyInterface::class); + $retryStategy->expects($this->once())->method('isRetryable')->willReturn(true); + $retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000); + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->once())->method('has')->willReturn(true); + $retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy); + + $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator); + + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + } }