Skip to content

Commit

Permalink
[Messenger] Add a Doctrine transport
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Oct 29, 2018
1 parent edcd627 commit f0b0993
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.doctrine.factory');
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@

<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.doctrine.factory" class="Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory">
<argument type="service" id="doctrine" on-invalid="ignore"/>
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>

<tag name="messenger.transport_factory" />
</service>
</services>
</container>
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

4.3.0
-----

* Add a `Doctrine` transport using `doctrine://<connection_name>` DSN

4.2.0
-----

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class DoctrineTransportFactoryTest extends TestCase
{

public function testSupports()
{
$factory = new DoctrineTransportFactory(
$this->getMockBuilder(RegistryInterface::class)->getMock(),
null,
false
);

$this->assertTrue($factory->supports('doctrine://default', array()));
$this->assertFalse($factory->supports('amqp://localhost', array()));
}

public function testCreateTransport()
{
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->willReturn($connection);

$factory = new DoctrineTransportFactory(
$registry,
null,
false
);

$this->assertEquals(
new DoctrineTransport(new Connection('messenger_messages', $connection, false), null),
$factory->createTransport('doctrine://default', array())
);
}

public function testCreateTransportWithCustomTableName()
{
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->willReturn($connection);

$factory = new DoctrineTransportFactory(
$registry,
null,
false
);

$this->assertEquals(
new DoctrineTransport(new Connection('custom_messages', $connection, false), null),
$factory->createTransport('doctrine://default', array('table_name' => 'custom_messages'))
);
}
}
105 changes: 105 additions & 0 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\Connection as DriverConnection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class Connection
{
private $tableName;
private $driverConnection;
private $debug;

const STATUS_PENDING = 'pending';
const STATUS_PROCESSING = 'processing';
const STATUS_ACK = 'ack';

public function __construct(string $tableName, DriverConnection $driverConnection, bool $debug = false)
{
$this->tableName = $tableName;
$this->driverConnection = $driverConnection;
$this->debug = $debug;
}

public function publish(string $body, array $headers): void
{
if (!$this->debug) {
$this->setup();
}
$stmt = $this->driverConnection->prepare('INSERT INTO '.$this->tableName.' (`body`, `headers`) VALUES (:body, :headers)');
$stmt->execute(array(
':body' => $body,
':headers' => $headers,
));
}

public function get(): array
{
if (!$this->debug) {
$this->setup();
}
$stmt = $this->driverConnection->prepare("SELECT * FROM {$this->tableName} WHERE status = 'pending' ORDER BY id ASC LIMIT 0,1");
$stmt->execute();

$doctrineEnvelop = $stmt->fetch();

$this->changeStatus($doctrineEnvelop['id'], self::STATUS_PROCESSING);

return $doctrineEnvelop;
}

public function ack($id): bool
{
return $this->changeStatus($id, self::STATUS_ACK);
}

public function nack($id): bool
{
return $this->changeStatus($id, self::STATUS_PENDING);
}

private function changeStatus($id, $status): bool
{
return $this->driverConnection->update($this->tableName, array('status' => $status), array('id' => $id)) > 0;
}

private function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema());
}

private function getSchema(): Schema
{
$schema = new Schema();
$table = $schema->createTable($this->tableName);
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Type::TEXT)
->setNotnull(true);
$table->addColumn('headers', Type::TARRAY)
->setNotnull(true);
$table->addColumn('status', Type::STRING)
->setLength(32)
->setDefault(self::STATUS_PENDING);
$table->setPrimaryKey(array('id'));

return $schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
private $shouldStop = false;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$doctrineEnvelop = $this->connection->get();

if (null === $doctrineEnvelop) {
$handler(null);

usleep(200000);
$this->pcntlDispatch();

continue;
}

try {
$handler($this->serializer->decode(array(
'body' => $doctrineEnvelop['body'],
'headers' => $doctrineEnvelop['headers'],
)));

$this->connection->ack($doctrineEnvelop['id']);
} catch (\Throwable $e) {
$this->connection->nack($doctrineEnvelop['id']);

throw $e;
} finally {
$this->pcntlDispatch();
}
}
}

public function stop(): void
{
$this->shouldStop = true;
}

private function pcntlDispatch()
{
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineSender implements SenderInterface
{
private $connection;
private $serializer;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);

return $envelope;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransport implements TransportInterface
{
private $connection;
private $serializer;
private $receiver;
private $sender;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($handler);
}

public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}

public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}

private function getReceiver(): DoctrineReceiver
{
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
}

private function getSender(): DoctrineSender
{
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
}
}

0 comments on commit f0b0993

Please sign in to comment.