Skip to content

Commit

Permalink
[Messenger] Add a Doctrine transport
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Oct 29, 2018
1 parent c2e55ff commit 4023eaa
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.doctrine.factory');
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
Expand Down
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

4.3.0
-----

* Add a `Doctrine` transport using `doctrine://<connection_name>` DSN

4.2.0
-----

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Statement;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;

class ConnectionTest extends TestCase
{
public function testGetAMessageWillChangeItsStatus()
{
$stmt = $this->getMockBuilder(Statement::class)
->disableOriginalConstructor()
->getMock();
$stmt->expects($this->once())
->method('execute');
$stmt->expects($this->once())
->method('fetch')
->willReturn(array(
'id' => 1,
'body' => '{"message":"Hi"}',
'headers' => \json_encode(array('type' => DummyMessage::class)),
));

$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$driverConnection->expects($this->once())
->method('prepare')
->willReturn($stmt);
$driverConnection->expects($this->once())
->method('update')
->with('messages', array('status' => Connection::STATUS_PROCESSING), array('id' => 1));

$connection = new Connection('messages', $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
$this->assertEquals(array('type' => DummyMessage::class), $doctrineEnvelope['headers']);
}

public function testGetWithNoPendingMessageWillReturnNull()
{
$stmt = $this->getMockBuilder(Statement::class)
->disableOriginalConstructor()
->getMock();
$stmt->expects($this->once())
->method('execute');
$stmt->expects($this->once())
->method('fetch')
->willReturn(false);

$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$driverConnection->expects($this->once())
->method('prepare')
->willReturn($stmt);
$driverConnection->expects($this->never())
->method('update');

$connection = new Connection('messages', $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\DriverManager;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;

/**
* @requires pdo_mysql
*/
class DoctrineIntegrationTest extends TestCase
{
protected function setUp()
{
parent::setUp();

if (!getenv('MESSENGER_DOCTRINE_DSN')) {
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
}
}

public function testConnectionPublishAndGet()
{
$driverConnection = DriverManager::getConnection(array('url' => getenv('MESSENGER_DOCTRINE_DSN')));
$connection = new Connection('messages', $driverConnection, true);
$connection->publish('{"message": "Hi"}', array('type' => DummyMessage::class));
$encoded = $connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(array('type' => DummyMessage::class), $encoded['headers']);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

class DoctrineReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->method('get')
->willReturn(array(
'id' => 'dummy_id',
'body' => '{"message": "Hi"}',
'headers' => array(
'type' => DummyMessage::class,
),
));

$connection->expects($this->once())->method('ack')->with('dummy_id');

$receiver = new DoctrineReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\Doctrine\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->method('get')
->willReturn(array(
'id' => 'dummy_id',
'body' => '{"message": "Hi"}',
'headers' => array(
'type' => DummyMessage::class,
),
));

$connection->expects($this->once())->method('nack')->with('dummy_id');

$receiver = new DoctrineReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
throw new InterruptException();
});
}
}

class InterruptException extends \Exception
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class DoctrineSenderTest extends TestCase
{
public function testSend()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));

$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);

$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$sender = new DoctrineSender($connection, $serializer);
$sender->send($envelope);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory;

class DoctrineTransportFactoryTest extends TestCase
{
public function testSupports()
{
$factory = new DoctrineTransportFactory(
$this->getMockBuilder(RegistryInterface::class)->getMock(),
null,
false
);

$this->assertTrue($factory->supports('doctrine://default', array()));
$this->assertFalse($factory->supports('amqp://localhost', array()));
}

public function testCreateTransport()
{
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->willReturn($connection);

$factory = new DoctrineTransportFactory(
$registry,
null,
false
);

$this->assertEquals(
new DoctrineTransport(new Connection('messenger_messages', $connection, false), null),
$factory->createTransport('doctrine://default', array())
);
}

public function testCreateTransportWithCustomTableName()
{
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->willReturn($connection);

$factory = new DoctrineTransportFactory(
$registry,
null,
false
);

$this->assertEquals(
new DoctrineTransport(new Connection('custom_messages', $connection, false), null),
$factory->createTransport('doctrine://default', array('table_name' => 'custom_messages'))
);
}
}

0 comments on commit 4023eaa

Please sign in to comment.