Skip to content

Commit

Permalink
[Messenger] fix delay delivery for non-fanout exchanges
Browse files Browse the repository at this point in the history
also fix dsn parsing of plain amqp:// uri
  • Loading branch information
Tobion committed Jun 14, 2019
1 parent 9865988 commit e9a587f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('amqp://');
Connection::fromDsn('amqp://:');
}

public function testItGetsParametersFromTheDsn()
public function testItCanBeConstructedWithDefaults()
{
$this->assertEquals(
new Connection([
Expand All @@ -44,44 +44,58 @@ public function testItGetsParametersFromTheDsn()
], [
'messages' => [],
]),
Connection::fromDsn('amqp://localhost/%2f/messages')
Connection::fromDsn('amqp://')
);
}

public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection([
'host' => 'host',
'port' => 5672,
'vhost' => '/',
], [
'name' => 'custom',
], [
'custom' => [],
]),
Connection::fromDsn('amqp://host/%2f/custom')
);
}

public function testOverrideOptionsViaQueryParameters()
{
$this->assertEquals(
new Connection([
'host' => 'redis',
'host' => 'localhost',
'port' => 1234,
'vhost' => '/',
'vhost' => 'vhost',
'login' => 'guest',
'password' => 'password',
], [
'name' => 'exchangeName',
], [
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
);
}

public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
{
$this->assertEquals(
new Connection([
'host' => 'redis',
'port' => 1234,
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'password',
'persistent' => 'true',
], [
'name' => 'exchangeName',
], [
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
'persistent' => 'true',
'exchange' => ['name' => 'toBeOverwritten'],
])
Expand Down
53 changes: 27 additions & 26 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,21 @@ class Connection
*/
private $amqpDelayExchange;

public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}

/**
* Constructor.
*
* Available options:
*
* * host: Hostname of the AMQP service
Expand All @@ -81,29 +93,19 @@ class Connection
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}

public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
if ('amqp://' !== $dsn) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
}

$parsedUrl = [];
}

$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
Expand Down Expand Up @@ -275,18 +277,17 @@ private function createDelayQueue(int $delay, ?string $routingKey)
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''],
[$delay, $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);

if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
}
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');

return $queue;
}
Expand All @@ -295,7 +296,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
{
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''],
[$delay, $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}
Expand Down

0 comments on commit e9a587f

Please sign in to comment.