Skip to content

Commit

Permalink
[Messenger] Add a Doctrine transport
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Mar 31, 2019
1 parent dd47fda commit f4f4bf4
Show file tree
Hide file tree
Showing 14 changed files with 1,148 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ CHANGELOG
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.

* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)

4.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;

class ConnectionTest extends TestCase
{
public function testGetAMessageWillChangeItsStatus()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock([
'id' => 1,
'body' => '{"message":"Hi"}',
'headers' => \json_encode(['type' => DummyMessage::class]),
]);

$driverConnection
->method('createQueryBuilder')
->willReturn($queryBuilder);
$queryBuilder
->method('getSQL')
->willReturn('');
$driverConnection
->method('prepare')
->willReturn($stmt);

$connection = new Connection([], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
}

public function testGetWithNoPendingMessageWillReturnNull()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock(false);

$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
$driverConnection->method('prepare')
->willReturn($stmt);
$driverConnection->expects($this->never())
->method('update');

$connection = new Connection([], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$driverConnection = $this->getDBALConnectionMock();
$driverConnection->method('delete')->willThrowException(new DBALException());

$connection = new Connection([], $driverConnection);
$connection->ack('dummy_id');
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$driverConnection = $this->getDBALConnectionMock();
$driverConnection->method('delete')->willThrowException(new DBALException());

$connection = new Connection([], $driverConnection);
$connection->reject('dummy_id');
}

private function getDBALConnectionMock()
{
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$platform = $this->getMockBuilder(AbstractPlatform::class)
->getMock();
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
$driverConnection->method('getDatabasePlatform')->willReturn($platform);

return $driverConnection;
}

private function getQueryBuilderMock()
{
$queryBuilder = $this->getMockBuilder(QueryBuilder::class)
->disableOriginalConstructor()
->getMock();

$queryBuilder->method('select')->willReturn($queryBuilder);
$queryBuilder->method('update')->willReturn($queryBuilder);
$queryBuilder->method('from')->willReturn($queryBuilder);
$queryBuilder->method('set')->willReturn($queryBuilder);
$queryBuilder->method('where')->willReturn($queryBuilder);
$queryBuilder->method('andWhere')->willReturn($queryBuilder);
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
$queryBuilder->method('setParameter')->willReturn($queryBuilder);

return $queryBuilder;
}

private function getStatementMock($expectedResult)
{
$stmt = $this->getMockBuilder(Statement::class)
->disableOriginalConstructor()
->getMock();
$stmt->expects($this->once())
->method('fetch')
->willReturn($expectedResult);

return $stmt;
}

/**
* @dataProvider buildConfigurationProvider
*/
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue)
{
$config = Connection::buildConfiguration($dsn, $options);
$this->assertEquals($expectedManager, $config['connection']);
$this->assertEquals($expectedTableName, $config['table_name']);
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']);
$this->assertEquals($expectedQueue, $config['queue_name']);
}

public function buildConfigurationProvider()
{
return [
[
'dsn' => 'doctrine://default',
'options' => [],
'expectedManager' => 'default',
'expectedTableName' => 'messenger_messages',
'expectedRedeliverTimeout' => 3600,
'expectedQueue' => 'default',
],
// test options from options array
[
'dsn' => 'doctrine://default',
'options' => [
'table_name' => 'name_from_options',
'redeliver_timeout' => 1800,
'queue_name' => 'important',
],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_options',
'expectedRedeliverTimeout' => 1800,
'expectedQueue' => 'important',
],
// tests options from dsn
[
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
'options' => [],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_dsn',
'expectedRedeliverTimeout' => 1200,
'expectedQueue' => 'normal',
],
// test options from options array wins over options from dsn
[
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
'options' => [
'table_name' => 'name_from_options',
'redeliver_timeout' => 1800,
'queue_name' => 'important',
],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_options',
'expectedRedeliverTimeout' => 1800,
'expectedQueue' => 'important',
],
];
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
{
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
{
Connection::buildConfiguration('doctrine://default?new_option=woops');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\DriverManager;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;

/**
* @requires pdo_mysql
*/
class DoctrineIntegrationTest extends TestCase
{
private $driverConnection;
private $connection;

protected function setUp()
{
parent::setUp();

if (!getenv('MESSENGER_DOCTRINE_DSN')) {
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
}
$dsn = getenv('MESSENGER_DOCTRINE_DSN');
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
$this->connection = new Connection([], $this->driverConnection);
// call send to auto-setup the table
$this->connection->setup();
// ensure the table is clean for tests
$this->driverConnection->exec('DELETE FROM messenger_messages');
}

public function testConnectionSendAndGet()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

public function testSendWithDelay()
{
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000);

$available_at = $this->driverConnection->createQueryBuilder()
->select('m.available_at')
->from('messenger_messages', 'm')
->where('m.body = :body')
->setParameter(':body', '{"message": "Hi i am delayed"}')
->execute()
->fetchColumn();

$available_at = new \DateTime($available_at);

$now = \DateTime::createFromFormat('U.u', microtime(true));
$now->modify('+60 seconds');
$this->assertGreaterThan($now, $available_at);
}

public function testItRetrieveTheFirstAvailableMessage()
{
// insert messages
// one currently handled
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);

$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
}

public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
{
$twoHoursAgo = new \DateTime('now');
$twoHoursAgo->modify('-2 hours');
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi requeued"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
]);
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);

$next = $this->connection->get();
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}
}

0 comments on commit f4f4bf4

Please sign in to comment.