Skip to content

Commit

Permalink
bug #36810 [Messenger] Do not stack retry stamp (jderusse)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 4.4 branch.

Discussion
----------

[Messenger] Do not stack retry stamp

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | /
| License       | MIT
| Doc PR        | /

With the "RecoverableException" or a very high number of retry, the message is currently stacking a lot of stamp, which increase the size of the message sent to queue and (in my case) reach the "maximum size allowed" after 60 retries + php serializer

This PR removes previous stamps before adding the new Delay+RetryStamps.

Commits
-------

ad6f853 [Messenger] Do not stack retry stamp
  • Loading branch information
fabpot committed Aug 17, 2020
2 parents 9d995bd + ad6f853 commit df3ab76
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
Expand Up @@ -21,6 +21,7 @@
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

/**
Expand All @@ -31,12 +32,14 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
private $historySize;

public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->historySize = $historySize;
}

public function onMessageFailed(WorkerMessageFailedEvent $event)
Expand Down Expand Up @@ -64,7 +67,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
}

// add the delay and retry stamp info
$retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));

// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
Expand All @@ -75,6 +78,30 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
}
}

/**
* Adds stamps to the envelope by keeping only the First + Last N stamps
*/
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
{
foreach ($stamps as $stamp) {
$history = $envelope->all(get_class($stamp));
if (\count($history) < $this->historySize) {
$envelope = $envelope->with($stamp);
continue;
}

$history = \array_merge(
[$history[0]],
\array_slice($history, -$this->historySize + 2),
[$stamp]
);

$envelope = $envelope->withoutAll(get_class($stamp))->with(...$history);
}

return $envelope;
}

public static function getSubscribedEvents()
{
return [
Expand Down
Expand Up @@ -76,4 +76,40 @@ public function testEnvelopeIsSentToTransportOnRetry()

$listener->onMessageFailed($event);
}

public function testEnvelopeKeepOnlyTheLast10Stamps()
{
$exception = new \Exception('no!');
$stamps = \array_merge(
\array_fill(0, 15, new DelayStamp(1)),
\array_fill(0, 3, new RedeliveryStamp(1))
);
$envelope = new Envelope(new \stdClass(), $stamps);

$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
$delayStamps = $envelope->all(DelayStamp::class);
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);

$this->assertCount(10, $delayStamps);
$this->assertCount(4, $redeliveryStamps);

return $envelope;
});
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->expects($this->once())->method('has')->willReturn(true);
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
$retryStategy = $this->createMock(RetryStrategyInterface::class);
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);

$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

$listener->onMessageFailed($event);
}
}

0 comments on commit df3ab76

Please sign in to comment.