Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add a Doctrine transport #29007

Merged
merged 1 commit into from Mar 31, 2019

Conversation

vincenttouzet
Copy link
Contributor

@vincenttouzet vincenttouzet commented Oct 28, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets
License MIT
Doc PR symfony/symfony-docs#10616
DoctrineBundle PR doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually AMQP is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

Configuration

To configure a Doctrine transport the dsn MUST have the format doctrine://<entity_manager_name> where <entity_manager_name> is the name of the entity manager (usually default)

        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"

Table schema

Dispatched messages are stored into a database table with the following schema:

Column Type Options Description
id bigint AUTO_INCREMENT, NOT NULL Primary key
body text NOT NULL Body of the message
headers text NOT NULL Headers of the message
queue varchar(32) NOT NULL Headers of the message
created_at datetime NOT NULL When the message was inserted onto the table. (automatically set)
available_at datetime NOT NULL When the message is available to be handled
delivered_at datetime NULL When the message was delivered to a worker

Message dispatching

When dispatching a message a new row is inserted into the table. See Symfony\Component\Messenger\Transport\Doctrine::publish

Message consuming

The message is retrieved by the Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver. It calls the Symfony\Component\Messenger\Transport\Doctrine::get method to get the next message to handle.

Getting the next message

  • Start a transaction
  • Lock the table to get the first message to handle (The lock is done with the SELECT ... FOR UPDATE query)
  • Update the message in database to update the delivered_at columns
  • Commit the transaction

Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the Symfony\Component\Messenger\Transport\Doctrine::ack which delete the message from the table.

If an error occured the receiver call the Symfony\Component\Messenger\Transport\Doctrine::nack method which update the message to set the delivered_at column to null.

Message requeueing

It may happen that a message is stuck in delivered state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the DoctrineReceiver call the Symfony\Component\Messenger\Transport\Doctrine::requeueMessages. This method update all the message with a delivered_at not null since more than the "redeliver timeout" (default to 3600 seconds)

TODO

Copy link
Contributor

@Taluu Taluu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but overall missing typehints / return hints as it's a feature for 4.3+, so php 7.1+. I'm guessing the missing tests are because it's a wip :}

@nicolas-grekas nicolas-grekas added this to the next milestone Oct 29, 2018
@vincenttouzet vincenttouzet force-pushed the messenger-doctrine-transport branch 3 times, most recently from d174c1b to 4023eaa Compare October 29, 2018 21:44
Copy link
Contributor

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add @experimental in 4.3 on all these classes please? Then, a few remaining comments. After this, it should be ready to go!

'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue' => 'default',
'created_at' => '2019-03-15 12:00:00',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this then. It's going to be YYYY-MM-DDTHH:mm:ss.sssZ, with the DateTime as UTC. I suggest to use a function rather than having the ->format(...) call copy/pasted.

@sroze
Copy link
Contributor

sroze commented Mar 31, 2019

@vincenttouzet mind the conflict with master. Once you've removed the id_strategy, can you squash and rebase as well?

@vincenttouzet vincenttouzet force-pushed the messenger-doctrine-transport branch 2 times, most recently from b2bccd7 to f4f4bf4 Compare March 31, 2019 13:15
@vincenttouzet
Copy link
Contributor Author

@sroze I've rebase and squash my commits :)

@sroze
Copy link
Contributor

sroze commented Mar 31, 2019

@vincenttouzet but you kept the id_strategy 😉

@vincenttouzet
Copy link
Contributor Author

That was just the doc in the configuration (part of another commit).

return null;
}

$doctrineEnvelope['headers'] = Type::getType(Type::JSON)->convertToPHPValue($doctrineEnvelope['headers'], $this->driverConnection->getDatabasePlatform());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type::JSON does not exists on early Doctrine versions as the low test suite is highlighting. Therefore, I suggest to move it back to STRING (like the body) and use json_encode. Whoever wants to have the table with a JSON field can create it manually. (FYI @weaverryan)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes i saw that. I was trying to require a minimum version for Doctrine but the Json type is available since v2.6.0 (tagged 2017-07-23 01:02) ... I've put back the json_encode / json_decode with the string type then

@sroze
Copy link
Contributor

sroze commented Mar 31, 2019

Thank you @vincenttouzet.

@sroze sroze merged commit 88d008c into symfony:master Mar 31, 2019
sroze added a commit that referenced this pull request Mar 31, 2019
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a Doctrine transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

# How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

## Configuration

To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"
```

## Table schema

Dispatched messages are stored into a database table with the following schema:

| Column       | Type     | Options                  | Description                                                       |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       |
| body         | text     | NOT NULL                 | Body of the message                                               |
| headers      | text     | NOT NULL                 | Headers of the message                                            |
| queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            |
| created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) |
| available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      |
| delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |

## Message dispatching

When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`

## Message consuming

The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.

### Getting the next message

* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction

### Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.

If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.

## Message requeueing

It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)

# TODO

- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476

Commits
-------

88d008c [Messenger] Add a Doctrine transport
@vincenttouzet
Copy link
Contributor Author

@sroze while updating the Doc PR I realized that I've keep the loop_sleep option but this is not used. I can make another PR to remove it

@sroze
Copy link
Contributor

sroze commented Mar 31, 2019

@vincenttouzet sure.

@vincenttouzet
Copy link
Contributor Author

vincenttouzet commented Mar 31, 2019

@sroze Hum there is also a problem with the DoctrineTransportFactory:

Fatal error: Declaration of Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::createTransport(string $dsn, array $options): Symfony\Component\Messenger\Transport\TransportInterface must be compatible with Symfony\Component\Messenger\Transport\TransportFactoryInterface::createTransport(string $dsn, array $options, Symfony\Component\Messenger\Transport\Serialization\SerializerInterface $serializer): Symfony\Component\Messenger\Transport\TransportInterface in /var/www/sources/symfony/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php on line 26

it worked before merging 🤔 . I can fix this in the same MR if you want

@sroze
Copy link
Contributor

sroze commented Mar 31, 2019

I fixed it in #30799.

sroze added a commit that referenced this pull request Mar 31, 2019
…terface (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Fix the Doctrine transport to use the new interface

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #29007
| License       | MIT
| Doc PR        | ø

Commits
-------

75e3355 Fix the Doctrine transport to use the new interface
@vincenttouzet
Copy link
Contributor Author

@sroze Ok. The PR to remove the unused option is created : #30803

@vincenttouzet vincenttouzet deleted the messenger-doctrine-transport branch April 7, 2019 07:44
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
@fabpot fabpot mentioned this pull request May 9, 2019
weaverryan added a commit to symfony/symfony-docs that referenced this pull request May 24, 2019
…uzet)

This PR was submitted for the master branch but it was merged into the 4.3 branch instead (closes #10616).

Discussion
----------

[Messenger] Describe the doctrine transport

Add documentation relative to the PR symfony/symfony#29007

TODO

- [ ] Document the max granularity in seconds for delay (doctrine/dbal#2873)

Commits
-------

27c0f9d [Messenger] Describe the doctrine tranport
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet