Skip to content

Commit

Permalink
[Messenger] Doctrine transport - add an option for the id strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Mar 30, 2019
1 parent 1e083a0 commit 2a31b47
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,12 @@ public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
{
Connection::buildConfiguration('doctrine://default?new_option=woops');
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsAnExceptionIfTheIdStrategyIsNotSupported()
{
Connection::buildConfiguration('doctrine://default?id_strategy=not_supported');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ protected function setUp()
public function testConnectionSendAndGet()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$doctrineEnvelop = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $doctrineEnvelop['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelop['headers']);
}

public function testSendWithDelay()
Expand Down Expand Up @@ -96,8 +96,8 @@ public function testItRetrieveTheFirstAvailableMessage()
'available_at' => '2019-03-15 12:30:00',
]);

$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
$doctrineEnvelop = $this->connection->get();
$this->assertEquals('{"message": "Hi available"}', $doctrineEnvelop['body']);
}

public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
Expand All @@ -124,4 +124,16 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}

public function testUuidStrategy()
{
$connection = new Connection(['id_strategy' => Connection::ID_STRATEGY_UUID], $this->driverConnection);
$this->driverConnection->exec('DROP TABLE messenger_messages');
$connection->setup();
$connection->send('{"message": "Hi uuid"}', ['type' => DummyMessage::class]);
$message = $this->connection->get();
$this->assertEquals(1, preg_match('/[a-f0-9]{8}\-[a-f0-9]{4}\-[a-f0-9]{4}\-(8|9|a|b)[a-f0-9]{3}\-[a-f0-9]{12}/', $message['id']));
$connection->reject($message['id']);
$this->driverConnection->exec('DROP TABLE messenger_messages');
}
}
51 changes: 40 additions & 11 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Connection
* Configuration of the connection.
*
* * table_name: name of the table
* * id_strategy: Strategy for the id field. uuid or auto_increment: Default: auto_increment
* * connection: name of the Doctrine's entity manager
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
Expand All @@ -39,11 +40,18 @@ class Connection
private $driverConnection;
const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'id_strategy' => self::ID_STRATEGY_AUTO_INCREMENT,
'queue_name' => 'default',
'redeliver_timeout' => 3600,
'loop_sleep' => 200000,
'auto_setup' => true,
];
const ID_STRATEGY_AUTO_INCREMENT = 'auto_increment';
const ID_STRATEGY_UUID = 'uuid';
const ID_STRATEGIES = [
self::ID_STRATEGY_AUTO_INCREMENT,
self::ID_STRATEGY_UUID,
];

public function __construct(array $configuration, DBALConnection $driverConnection)
{
Expand Down Expand Up @@ -71,12 +79,17 @@ public static function buildConfiguration($dsn, array $options = [])
$configuration = [
'connection' => $components['host'],
'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']),
'id_strategy' => $options['id_strategy'] ?? ($query['id_strategy'] ?? self::DEFAULT_OPTIONS['id_strategy']),
'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']),
'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']),
'loop_sleep' => $options['loop_sleep'] ?? ($query['loop_sleep'] ?? self::DEFAULT_OPTIONS['loop_sleep']),
'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
];

if (!\in_array($configuration['id_strategy'], self::ID_STRATEGIES)) {
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $configuration['id_strategy'], implode(', ', self::ID_STRATEGIES)));
}

// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
if (0 < \count($optionsExtraKeys)) {
Expand All @@ -102,15 +115,19 @@ public function send(string $body, array $headers, int $delay = 0): void
$now = (\DateTime::createFromFormat('U.u', microtime(true)));
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));

$values = [
'body' => ':body',
'headers' => ':headers',
'queue_name' => ':queue_name',
'created_at' => ':created_at',
'available_at' => ':available_at',
];
if (self::ID_STRATEGY_UUID === $this->configuration['id_strategy']) {
$values['id'] = $this->driverConnection->getDatabasePlatform()->getGuidExpression();
}
$queryBuilder = $this->driverConnection->createQueryBuilder()
->insert($this->configuration['table_name'])
->values([
'body' => ':body',
'headers' => ':headers',
'queue_name' => ':queue_name',
'created_at' => ':created_at',
'available_at' => ':available_at',
]);
->values($values);

$this->executeQuery($queryBuilder->getSQL(), [
':body' => $body,
Expand Down Expand Up @@ -223,11 +240,23 @@ private function getSchema(): Schema
{
$schema = new Schema();
$table = $schema->createTable($this->configuration['table_name']);
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Type::TEXT)
switch ($this->configuration['id_strategy']) {
case self::ID_STRATEGY_UUID:
$table->addColumn('id', Type::GUID)
->setNotnull(true);
break;
case self::ID_STRATEGY_AUTO_INCREMENT:
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
break;
default:
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $this->configuration['id_strategy'], self::ID_STRATEGIES));
}
if ($this->configuration['id_strategy']) {
$table->addColumn('body', Type::TEXT)
->setNotnull(true);
}
$table->addColumn('headers', Type::JSON)
->setNotnull(true);
$table->addColumn('queue_name', Type::STRING)
Expand Down

0 comments on commit 2a31b47

Please sign in to comment.