From 9940e71ae1a036a6dc927ffbf4ef6ecd2bd90298 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Wed, 1 May 2019 15:33:07 -0400 Subject: [PATCH] fixing a bug where the delay queue name did not contain the final routing key --- .../AmqpExt/AmqpExtIntegrationTest.php | 68 +++++++++++++++---- .../Transport/AmqpExt/Connection.php | 38 +++++++---- 2 files changed, 80 insertions(+), 26 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index 0addadd6442b..57c44d4ed29d 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -19,7 +19,9 @@ use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Process\PhpProcess; @@ -84,8 +86,10 @@ public function testRetryAndDelay() $sender = new AmqpSender($connection, $serializer); $receiver = new AmqpReceiver($connection, $serializer); + // send a first message $sender->send($first = new Envelope(new DummyMessage('First'))); + // receive it immediately and imitate a redeliver with 2 second delay $envelopes = iterator_to_array($receiver->get()); /** @var Envelope $envelope */ $envelope = $envelopes[0]; @@ -95,25 +99,36 @@ public function testRetryAndDelay() $sender->send($newEnvelope); $receiver->ack($envelope); - $envelopes = []; - $startTime = time(); - // wait for next message, but only for max 3 seconds - while (0 === \count($envelopes) && $startTime + 3 > time()) { - $envelopes = iterator_to_array($receiver->get()); - } + // send a 2nd message with a shorter delay and custom routing key + $customRoutingKeyMessage = new DummyMessage('custom routing key'); + $envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [ + new DelayStamp(1000), + new AmqpStamp('my_custom_routing_key'), + ]); + $sender->send($envelopeCustomRoutingKey); + + // wait for next message (but max at 3 seconds) + $startTime = microtime(true); + $envelopes = $this->receiveEnvelopes($receiver, 3); + + // duration should be about 1 second + $this->assertApproximateDuration($startTime, 1); + // this should be the custom routing key message first $this->assertCount(1, $envelopes); - /** @var Envelope $envelope */ - $envelope = $envelopes[0]; + /* @var Envelope $envelope */ + $receiver->ack($envelopes[0]); + $this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage()); - // should have a 2 second delay - $this->assertGreaterThanOrEqual($startTime + 2, time()); - // but only a 2 second delay - $this->assertLessThan($startTime + 4, time()); + // wait for final message (but max at 3 seconds) + $envelopes = $this->receiveEnvelopes($receiver, 3); + // duration should be about 2 seconds + $this->assertApproximateDuration($startTime, 2); - /** @var RedeliveryStamp|null $retryStamp */ + /* @var RedeliveryStamp|null $retryStamp */ // verify the stamp still exists from the last send - $retryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertCount(1, $envelopes); + $retryStamp = $envelopes[0]->last(RedeliveryStamp::class); $this->assertNotNull($retryStamp); $this->assertSame(1, $retryStamp->getRetryCount()); @@ -206,4 +221,29 @@ private function createSerializer(): SerializerInterface new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()]) ); } + + private function assertApproximateDuration($startTime, int $expectedDuration) + { + $actualDuration = microtime(true) - $startTime; + + if (\method_exists([$this, 'assertEqualsWithDelta'])) { + $this->assertEqualsWithDelta($expectedDuration, $actualDuration, 'Duration was not within expected range', .5); + } else { + $this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5); + } + } + + /** + * @return Envelope[] + */ + private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array + { + $envelopes = []; + $startTime = microtime(true); + while (0 === \count($envelopes) && $startTime + $timeout > time()) { + $envelopes = iterator_to_array($receiver->get()); + } + + return $envelopes; + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index ea9ba1f02af5..bdd04c56c964 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -79,8 +79,8 @@ class Connection * * flags: Exchange flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * delay: - * * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%") - * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%") + * * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%") + * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%") * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry") * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) @@ -90,9 +90,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar { $this->connectionOptions = array_replace_recursive([ 'delay' => [ - 'routing_key_pattern' => 'delay_%delay%', + 'routing_key_pattern' => 'delay_%routing_key%_%delay%', 'exchange_name' => 'delay', - 'queue_name_pattern' => 'delay_queue_%delay%', + 'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%', ], ], $connectionOptions); $this->exchangeOptions = $exchangeOptions; @@ -186,7 +186,7 @@ public function publish(string $body, array $headers = [], int $delay = 0, AmqpS $this->publishOnExchange( $this->exchange(), $body, - (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(), + $this->getRoutingKeyForMessage($amqpStamp), [ 'headers' => $headers, ], @@ -209,14 +209,16 @@ public function countMessagesInQueues(): int */ private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null) { + $routingKey = $this->getRoutingKeyForMessage($amqpStamp); + if ($this->shouldSetup()) { - $this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null); + $this->setupDelay($delay, $routingKey); } $this->publishOnExchange( $this->getDelayExchange(), $body, - $this->getRoutingKeyForDelay($delay), + $this->getRoutingKeyForDelay($delay, $routingKey), [ 'headers' => $headers, ], @@ -245,7 +247,7 @@ private function setupDelay(int $delay, ?string $routingKey) $queue = $this->createDelayQueue($delay, $routingKey); $queue->declareQueue(); - $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay)); + $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey)); } private function getDelayExchange(): \AMQPExchange @@ -271,13 +273,16 @@ private function getDelayExchange(): \AMQPExchange private function createDelayQueue(int $delay, ?string $routingKey) { $queue = $this->amqpFactory->createQueue($this->channel()); - $queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern'])); + $queue->setName(str_replace( + ['%delay%', '%routing_key%'], + [$delay, $routingKey ?: ''], + $this->connectionOptions['delay']['queue_name_pattern'] + )); $queue->setArguments([ 'x-message-ttl' => $delay, 'x-dead-letter-exchange' => $this->exchange()->getName(), ]); - $routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey(); if (null !== $routingKey) { // after being released from to DLX, this routing key will be used $queue->setArgument('x-dead-letter-routing-key', $routingKey); @@ -286,9 +291,13 @@ private function createDelayQueue(int $delay, ?string $routingKey) return $queue; } - private function getRoutingKeyForDelay(int $delay): string + private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string { - return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']); + return str_replace( + ['%delay%', '%routing_key%'], + [$delay, $finalRoutingKey ?: ''], + $this->connectionOptions['delay']['routing_key_pattern'] + ); } /** @@ -444,4 +453,9 @@ public function purgeQueues() $this->queue($queueName)->purge(); } } + + private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string + { + return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(); + } }