Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Adding final routing key to delay queue name #31355

Merged
merged 1 commit into from May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,9 @@
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Process\PhpProcess;
Expand Down Expand Up @@ -84,8 +86,10 @@ public function testRetryAndDelay()
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);

// send a first message
$sender->send($first = new Envelope(new DummyMessage('First')));

// receive it immediately and imitate a redeliver with 2 second delay
$envelopes = iterator_to_array($receiver->get());
/** @var Envelope $envelope */
$envelope = $envelopes[0];
Expand All @@ -95,25 +99,36 @@ public function testRetryAndDelay()
$sender->send($newEnvelope);
$receiver->ack($envelope);

$envelopes = [];
$startTime = time();
// wait for next message, but only for max 3 seconds
while (0 === \count($envelopes) && $startTime + 3 > time()) {
$envelopes = iterator_to_array($receiver->get());
}
// send a 2nd message with a shorter delay and custom routing key
$customRoutingKeyMessage = new DummyMessage('custom routing key');
$envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [
new DelayStamp(1000),
new AmqpStamp('my_custom_routing_key'),
]);
$sender->send($envelopeCustomRoutingKey);

// wait for next message (but max at 3 seconds)
$startTime = microtime(true);
$envelopes = $this->receiveEnvelopes($receiver, 3);

// duration should be about 1 second
$this->assertApproximateDuration($startTime, 1);

// this should be the custom routing key message first
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
/* @var Envelope $envelope */
$receiver->ack($envelopes[0]);
$this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());

// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
// wait for final message (but max at 3 seconds)
$envelopes = $this->receiveEnvelopes($receiver, 3);
// duration should be about 2 seconds
$this->assertApproximateDuration($startTime, 2);

/** @var RedeliveryStamp|null $retryStamp */
/* @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertCount(1, $envelopes);
$retryStamp = $envelopes[0]->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());

Expand Down Expand Up @@ -206,4 +221,29 @@ private function createSerializer(): SerializerInterface
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
);
}

private function assertApproximateDuration($startTime, int $expectedDuration)
{
$actualDuration = microtime(true) - $startTime;

if (\method_exists([$this, 'assertEqualsWithDelta'])) {
$this->assertEqualsWithDelta($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the order of arguments is wrong. assertEqualsWithDelta($expected, $actual, float $delta, string $message = '')

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on it

} else {
$this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
}
}

/**
* @return Envelope[]
*/
private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array
{
$envelopes = [];
$startTime = microtime(true);
while (0 === \count($envelopes) && $startTime + $timeout > time()) {
$envelopes = iterator_to_array($receiver->get());
}

return $envelopes;
}
}
38 changes: 26 additions & 12 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Expand Up @@ -79,8 +79,8 @@ class Connection
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
Expand All @@ -90,9 +90,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%delay%',
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%delay%',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
Expand Down Expand Up @@ -186,7 +186,7 @@ public function publish(string $body, array $headers = [], int $delay = 0, AmqpS
$this->publishOnExchange(
$this->exchange(),
$body,
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
$this->getRoutingKeyForMessage($amqpStamp),
[
'headers' => $headers,
],
Expand All @@ -209,14 +209,16 @@ public function countMessagesInQueues(): int
*/
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);

if ($this->shouldSetup()) {
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
$this->setupDelay($delay, $routingKey);
}

$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay),
$this->getRoutingKeyForDelay($delay, $routingKey),
[
'headers' => $headers,
],
Expand Down Expand Up @@ -245,7 +247,7 @@ private function setupDelay(int $delay, ?string $routingKey)

$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey));
}

private function getDelayExchange(): \AMQPExchange
Expand All @@ -271,13 +273,16 @@ private function getDelayExchange(): \AMQPExchange
private function createDelayQueue(int $delay, ?string $routingKey)
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);

$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
Expand All @@ -286,9 +291,13 @@ private function createDelayQueue(int $delay, ?string $routingKey)
return $queue;
}

private function getRoutingKeyForDelay(int $delay): string
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
{
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}

/**
Expand Down Expand Up @@ -444,4 +453,9 @@ public function purgeQueues()
$this->queue($queueName)->purge();
}
}

private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
}
}