Skip to content

Commit

Permalink
[Messenger] Add a Doctrine transport
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Oct 28, 2018
1 parent edcd627 commit 536ed49
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.doctrine.factory');
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@

<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.doctrine.factory" class="Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory">
<argument type="service" id="doctrine" on-invalid="ignore"/>
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>

<tag name="messenger.transport_factory" />
</service>
</services>
</container>
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ CHANGELOG
* `ReceiverInterface` and its implementations have been moved to the `Transport\Receiver` sub-namespace
* `ActivationMiddlewareDecorator` has been renamed `ActivationMiddleware`
* `AllowNoHandlerMiddleware` has been removed in favor of a new constructor argument on `HandleMessageMiddleware`
* Add a `Doctrine` transport

4.1.0
-----
Expand Down
106 changes: 106 additions & 0 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\Connection as DriverConnection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class Connection
{
/** @var string */
private $tableName;
/** @var \Doctrine\DBAL\Driver\Connection */
private $driverConnection;
/** @var bool */
private $debug;

public function __construct(string $tableName, DriverConnection $driverConnection, $debug = false)
{
$this->tableName = $tableName;
$this->driverConnection = $driverConnection;
$this->debug = $debug;
}

public function publish($body, $headers)
{
if (!$this->debug) {
$this->setup();
}
$query = sprintf(
'INSERT INTO %s (`body`, `headers`) VALUES (%s, %s)',
$this->tableName,
$this->driverConnection->quote($body),
$headers
);
$this->driverConnection->exec($query);
}

public function get()
{
if (!$this->debug) {
$this->setup();
}
$stmt = $this->driverConnection->prepare("SELECT * FROM {$this->tableName} WHERE status = 'pending' ORDER BY id ASC LIMIT 0,1");
$stmt->execute();

$doctrineEnvelop = $stmt->fetch();

$this->changeStatus($doctrineEnvelop['id'], 'processing');

return $doctrineEnvelop;
}

public function ack($id)
{
$this->changeStatus($id, 'ack');
}

public function nack($id)
{
$this->changeStatus($id, 'pending');
}

private function changeStatus($id, $status)
{
$this->driverConnection->update($this->tableName, array('status' => $status), array('id' => $id));
}

private function setup()
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema());
}

private function getSchema()
{
$schema = new Schema();
$table = $schema->createTable($this->tableName);
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Type::TEXT)
->setNotnull(true);
$table->addColumn('headers', Type::TARRAY)
->setNotnull(true);
$table->addColumn('status', Type::STRING)
->setLength(32)
->setDefault('pending');
$table->setPrimaryKey(array('id'));

return $schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceiver implements ReceiverInterface
{
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var bool */
private $shouldStop = false;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$doctrineEnvelop = $this->connection->get();

if (null === $doctrineEnvelop) {
$handler(null);

usleep(200000);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}

continue;
}

try {
$handler($this->serializer->decode(array(
'body' => $doctrineEnvelop['body'],
'headers' => $doctrineEnvelop['headers'],
)));

$this->connection->ack($doctrineEnvelop['id']);
} catch (\Throwable $e) {
$this->connection->nack($doctrineEnvelop['id']);

throw $e;
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
}
}

public function stop(): void
{
$this->shouldStop = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineSender implements SenderInterface
{
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);

return $envelope;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransport implements TransportInterface
{
/** @var \Doctrine\DBAL\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver */
private $receiver;
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender */
private $sender;

public function __construct(Connection $connection, Serializer $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($handler);
}

public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}

public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}

private function getReceiver()
{
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
}

private function getSender()
{
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransportFactory implements TransportFactoryInterface
{
/** @var \Symfony\Bridge\Doctrine\RegistryInterface */
private $registry;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var bool */
private $debug;

public function __construct(RegistryInterface $registry, Serializer $serializer = null, $debug = false)
{
$this->registry = $registry;
$this->serializer = $serializer ?? Serializer::create();
$this->debug = $debug;
}

public function createTransport(string $dsn, array $options): TransportInterface
{
$manager = str_replace('doctrine://', '', $dsn);

$connection = new Connection($options['table_name'] ?? 'messenger_messages', $this->registry->getConnection($manager), $this->debug);

return new DoctrineTransport($connection, $this->serializer);
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'doctrine://');
}
}

0 comments on commit 536ed49

Please sign in to comment.