Skip to content

Commit

Permalink
[Messenger] Add a Doctrine transport
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttouzet committed Oct 29, 2018
1 parent edcd627 commit 4fae182
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.doctrine.factory');
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@

<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.doctrine.factory" class="Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory">
<argument type="service" id="doctrine" on-invalid="ignore"/>
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>

<tag name="messenger.transport_factory" />
</service>
</services>
</container>
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

4.3.0
-----

* Add a `Doctrine` transport

4.2.0
-----

Expand Down
104 changes: 104 additions & 0 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\Connection as DriverConnection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class Connection
{
/** @var string */
private $tableName;
/** @var \Doctrine\DBAL\Driver\Connection */
private $driverConnection;
/** @var bool */
private $debug;

public function __construct(string $tableName, DriverConnection $driverConnection, bool $debug = false)
{
$this->tableName = $tableName;
$this->driverConnection = $driverConnection;
$this->debug = $debug;
}

public function publish(string $body, array $headers): void
{
if (!$this->debug) {
$this->setup();
}
$stmt = $this->driverConnection->prepare('INSERT INTO '.$this->tableName.' (`body`, `headers`) VALUES (:body, :headers)');
$stmt->execute([
':body' => $body,
':headers' => $headers,
]);
}

public function get(): array
{
if (!$this->debug) {
$this->setup();
}
$stmt = $this->driverConnection->prepare("SELECT * FROM {$this->tableName} WHERE status = 'pending' ORDER BY id ASC LIMIT 0,1");
$stmt->execute();

$doctrineEnvelop = $stmt->fetch();

$this->changeStatus($doctrineEnvelop['id'], 'processing');

return $doctrineEnvelop;
}

public function ack($id): bool
{
return $this->changeStatus($id, 'ack');
}

public function nack($id): bool
{
return $this->changeStatus($id, 'pending');
}

private function changeStatus($id, $status): bool
{
return $this->driverConnection->update($this->tableName, array('status' => $status), array('id' => $id)) > 0;
}

private function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema());
}

private function getSchema(): Schema
{
$schema = new Schema();
$table = $schema->createTable($this->tableName);
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Type::TEXT)
->setNotnull(true);
$table->addColumn('headers', Type::TARRAY)
->setNotnull(true);
$table->addColumn('status', Type::STRING)
->setLength(32)
->setDefault('pending');
$table->setPrimaryKey(array('id'));

return $schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceiver implements ReceiverInterface
{
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var bool */
private $shouldStop = false;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$doctrineEnvelop = $this->connection->get();

if (null === $doctrineEnvelop) {
$handler(null);

usleep(200000);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}

continue;
}

try {
$handler($this->serializer->decode(array(
'body' => $doctrineEnvelop['body'],
'headers' => $doctrineEnvelop['headers'],
)));

$this->connection->ack($doctrineEnvelop['id']);
} catch (\Throwable $e) {
$this->connection->nack($doctrineEnvelop['id']);

throw $e;
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
}
}

public function stop(): void
{
$this->shouldStop = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineSender implements SenderInterface
{
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;

public function __construct(Connection $connection, Serializer $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);

return $envelope;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransport implements TransportInterface
{
/** @var \Doctrine\DBAL\Connection */
private $connection;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver */
private $receiver;
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender */
private $sender;

public function __construct(Connection $connection, Serializer $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
}

public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($handler);
}

public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}

public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}

private function getReceiver(): DoctrineReceiver
{
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
}

private function getSender(): DoctrineSender
{
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Doctrine;

use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransportFactory implements TransportFactoryInterface
{
/** @var \Symfony\Bridge\Doctrine\RegistryInterface */
private $registry;
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
private $serializer;
/** @var bool */
private $debug;

public function __construct(RegistryInterface $registry, Serializer $serializer = null, $debug = false)
{
$this->registry = $registry;
$this->serializer = $serializer ?? Serializer::create();
$this->debug = $debug;
}

public function createTransport(string $dsn, array $options): TransportInterface
{
$manager = str_replace('doctrine://', '', $dsn);

$connection = new Connection($options['table_name'] ?? 'messenger_messages', $this->registry->getConnection($manager), $this->debug);

return new DoctrineTransport($connection, $this->serializer);
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'doctrine://');
}
}

0 comments on commit 4fae182

Please sign in to comment.