Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CopyBoth queries and replication mode in config #778

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

petrosagg
Copy link
Contributor

This PR only adds support for setting the connection replication mode in the connection configuration and support for CopyBoth mode queries. This is the minimum support needed for downstream users to implement replication support on their own.

@jeff-davis After studying the code I think we don't need to add unpipelined_send nor worry about protocol desyncs.

The reason we don't need unpipelined_send is because just like CopyIn, when performing a CopyBoth operation the connection enters a special mode where it consumes FrontendMessages from a sub-receiver . These sub-receivers are the CopyInReceiver and CopyBothReceiver structs.

The reason we don't need to worry about protocol desyncs is that once the connection enters a sub-mode (with Copy{In,Both}Receiver) all other sends to the main command queue are ignored while the subprotocol is running. Only after these are exhausted, which always happens with a CopyDone/CopyFail/Sync message, the system resumes consuming the main queue.

The PR includes two tests that verify the client can resume operation to normal query processing. One that gracefully shuts down the replication stream and one that simply drops the handles.

Copy link
Contributor

@jeff-davis jeff-davis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. A few questions/concerns.

tokio-postgres/tests/test/copy_both.rs Outdated Show resolved Hide resolved
tokio-postgres/src/copy_both.rs Outdated Show resolved Hide resolved
tokio-postgres/src/copy_both.rs Show resolved Hide resolved
tokio-postgres/src/copy_both.rs Outdated Show resolved Hide resolved
tokio-postgres/tests/test/copy_both.rs Show resolved Hide resolved
/// coming from the server. If it is not, `Sink::close` may hang forever waiting for the stream
/// messages to be consumed.
///
/// The copy should be explicitly completed via the `Sink::close` method to ensure all data is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like dropping it is enough?

Comment on lines 100 to 251
/// The stream side *must* be consumed even if not required in order to process the messages
/// coming from the server. If it is not, `Sink::close` may hang forever waiting for the stream
/// messages to be consumed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on the reasoning here? close() or drop() should certainly ensure that a CopyDone is sent. And if the receiver is closed, that should cause future messages received in this sub-protocol to be discarded, eventually allowing the protocol to resume normal operations. Right?

tokio-postgres/tests/test/copy_both.rs Outdated Show resolved Hide resolved
Comment on lines 151 to 153
// Indicate to CopyBothReceiver to produce a Sync message instead of CopyDone
let _ = this.error_sender.take().unwrap().send(());
return Poll::Ready(Some(Err(Error::db(error))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume that the error happened during START_REPLICATION? What about an error that happened during the stream; shouldn't we still send a CopyDone?

@@ -449,6 +451,15 @@ impl Client {
copy_out::copy_out(self.inner(), statement).await
}

/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
/// data.
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the replication stream, if the timeline is historical, Postgres will send a tuple as a response. So we actually need a function that returns something like Result<(CopyBothDuplex<T>, Option<SimpleQueryMessage>), Error> (or maybe Result<(CopyBothDuplex<T>, Option<Vec<SimpleQueryMessage>>), Error> in case other commands are added in the future which use CopyBoth and return a set).

It's actually very specific to START_REPLICATION (and even more specifically, to physical replication), so it might make sense to have a more specific name or at least clarify what it's expecting the command to do. Maybe something like copy_both_simple_with_result()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll take a look on how we can expose this to users, ideally in a generic way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you missed my other comment, there's a similar issue for BASE_BACKUP, except with CopyOut instead. That can be a separate PR, though.

@jeff-davis
Copy link
Contributor

START_REPLICATION and BASE_BACKUP don't follow a simple pattern. START_REPLICATION begins streaming in CopyBoth mode, and when the copy is finished, it then sometimes sends a single tuple (when streaming from a timeline other than the current one). BASE_BACKUP first sends two result sets (one tuple in the first set, multiple tuples in the second set), then it sends the copy data in CopyOut mode.

Supporting these commands requires some special methods on Client that won't really be useful for anything else, so I think we should name them specifically, e.g. client.start_replication_command() and client.base_backup_command().

@jeff-davis
Copy link
Contributor

//! It is recommended that you use a PostgreSQL server patch version                                               
//! of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or                                                        
//! 9.5.25. Earlier patch levels have a bug that doesn't properly                                                  
//! handle pipelined requests after streaming has stopped.

In my PR, I had the above comment. Does that apply to this PR, as well?

@jeff-davis
Copy link
Contributor

It would be good to have a way to send Standby Status Updates and Hot Standby Feedback. Not all of this has to be in this PR, though. I'm just commenting on everything necessary to make a full-featured and independent crate for replication.

@petrosagg petrosagg force-pushed the copy-both branch 2 times, most recently from 935bd20 to f5e018f Compare May 27, 2021 22:28
Copy link
Contributor Author

@petrosagg petrosagg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeff-davis thank you for the thorough review!

Unfortunately last night I realised that the original design had a big hole that allowed de-syncing the protocol. I just push a new version that I think is Correct (tm) now.

When doing a CopyBoth query the architecture looks something like this:

                                        |
        <tokio_postgres owned>          |    <userland owned>
                                        |
pg -> Connection -> CopyBothReceiver ---+---> CopyBothDuplex
                                        |          /   \
                                        |         v     v
                                        |      Sink    Stream

The original version of the feature handled the state machine of the CopyBoth sub-protocol as part of the Stream implementation of CopyBothDuplex and treated CopyBothReceiver as a dumb relay of messages into the connection. Therein lies the problem. A user could create a CopyBothDuplex and drop it immediately. In that case the dumb CopyBothReceiver would unconditionally send a CopyDone to the server and finish. But what if the server sent an error in the meantime? There was nothing to handle this fact and no Sync message was being sent.

So this re-work of the feature flips this relationship around. CopyBothReceiver contains all the logic to drive the sub-protocol forward and ensures that no matter what the correct messages are being exchanged with the server. The user is free to drop their end at any point.

I've also added a ton of comments and a few diagrams to make reading and reviewing the code easier. If it's not a lot of ask I'd hugely appreciate another round of review, I think we're close!

Next week I plan to compile a modified version of postgres that errors out on purpose to test out the error paths of this PR and also see how we can incorporate getting results back for things like timeline changes etc.

@@ -449,6 +451,15 @@ impl Client {
copy_out::copy_out(self.inner(), statement).await
}

/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
/// data.
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll take a look on how we can expose this to users, ideally in a generic way.

tokio-postgres/src/copy_both.rs Show resolved Hide resolved
tokio-postgres/tests/test/copy_both.rs Show resolved Hide resolved
@petrosagg
Copy link
Contributor Author

@jeff-davis

START_REPLICATION and BASE_BACKUP don't follow a simple pattern.

It looks like the general pattern is that the database can respond with result sets, copy outs, or copy boths. Based on that I think we need here is to define something like:

enum ResponsePart {
    RowStream(RowStream),
    SimpleQueryStream(SimpleQueryStream),
    CopyOutStream(CopyOutStream),
    CopyBothStream(CopyBothStream),
}

And then provide a method to send a query and get a Stream of ResponseParts back. This will leave the interpretations of the various segments to the user issuing the query or some other higher level downstream library. How does this sound?

In my PR, I had the above comment. Does that apply to this PR, as well?

hm do you have a link to the patch? I thought pipelined queries were not allowed over a replication connection

@jeff-davis
Copy link
Contributor

And then provide a method to send a query and get a Stream of ResponseParts back. This will leave the interpretations of the various segments to the user issuing the query or some other higher level downstream library. How does this sound?

I like it. Maybe it can even be refactored so that there's one generic entry point, query_simple_extended(), that returns a stream of ResponseParts; and the other entry points are just special cases of that one. But the refactoring might be better as a separate PR.

hm do you have a link to the patch? I thought pipelined queries were not allowed over a replication connection

https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=a58db3aa10e62e4228aa409ba006014fa07a8ca2

Is there a reason you thought pipelined queries would not be allowed in replication mode? And if so, how would you prevent them, since the whole protocol implementation is built around pipelining, without explicitly blocking in certain cases?

@petrosagg
Copy link
Contributor Author

Is there a reason you thought pipelined queries would not be allowed in replication mode? And if so, how would you prevent them, since the whole protocol implementation is built around pipelining, without explicitly blocking in certain cases?

You're 100% right. I confused pipelined queries with extended query mode which is the one that's not allowed in replication mode. So yeah, your original comment still applies in that users should pay attention to this bug. I added this note in the documentation of the ReplicationMode enum and the Config::replication_mode function

Comment on lines 26 to 37
/// CopyOut-->ServerError<--CopyIn
/// \ | /
/// `---, | ,---'
/// \ | /
/// v v v
/// CopyNone
/// |
/// v
/// CopyComplete
/// |
/// v
/// CommandComplete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right place in the state machine for ServerError. If a server error happens in CopyBoth mode, that's the odd (and hard-to-test) case where an error is thrown by the server in the middle of streaming. From the docs, it looks like Postgres will just return the ErrorResponse and then ReadyForQuery, without either of the CommandComplete messages.

You can test this by hacking up test_decoding to throw an error randomly every 10 records or something.

vkrasnov referenced this pull request in readysettech/rust-postgres Jun 4, 2021
@jeff-davis
Copy link
Contributor

Summary of the remaining issues as I see them:

  1. Need an API to execute a query using the simple protocol and get back a stream of ResponseParts (or another API with similar functionality).
  2. Change the way errors during a CopyBoth stream are handled.
    1. The simplest answer is to close the whole client if this happens. That might be best, because handling the errors will be hard to test and not especially useful. Note: this reasoning does not apply to ordinary errors that happen as a part of the START_REPLICATION command, which should be properly handled; only errors that happen after entering CopyBoth mode can reasonably cause the client to close the connection.
    2. If we can reasonably test it, handling the errors would be the most proper thing to do, and the client can potentially try again with a new START_REPLICATION command without closing the entire connection. Honestly, I don't see a real use case here, but it does seem a bit more proper.
  3. It would be nice to offer an API to send Standby Status Updates and Hot Standby Feedback.

After these are done, I'll be able to port my other replication code to a new crate, and if that works, then I think this PR is ready.

@petrosagg petrosagg force-pushed the copy-both branch 7 times, most recently from 50c81b8 to 77dff40 Compare June 7, 2021 17:32
Comment on lines 106 to 109
if self.buffered_message.is_some() {
// If the receiver is gone we'll just drop the message
let _ = ready!(self.stream_sender.poll_ready(cx));
let message = self.buffered_message.take().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use an if ... let here to avoid the unwrap()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, but if buffered_message is None then it is definitely a bug in the library because the outer if checked that it's Some(_). I'll replace unwrap() with expect() to provide an explanation in case this happens in the future

@jeff-davis
Copy link
Contributor

Maybe we should mark the Client with the ReplicationMode and provide a getter so that we know whether we can execute replication commands on a given client?

@petrosagg petrosagg force-pushed the copy-both branch 3 times, most recently from 2594af3 to 909f2dc Compare June 15, 2021 08:53
@petrosagg
Copy link
Contributor Author

hey @sfackler! Is there a chance you could take a look at this PR? It's the minimal support needed in rust-postgres to allow higher level crates to be developed independently

@videni
Copy link

videni commented Oct 5, 2021

what is the latest status?

@petrosagg
Copy link
Contributor Author

@videni this PR is ready, passes all tests and we have been using it in production for a long time at @MaterializeInc, unfortunately from a fork. It's the minimal amount of support needed in rust-postgres to allow for higher level external crates handling replication decoding etc.

@sfackler it would be amazing if we could get this merged and allow third party crates to do the rest of the work. It only implements CopyBoth support in the way you had suggested in an earlier review of yours and doesn't deal with the replication protocol itself at all

@yeputons
Copy link

yeputons commented Nov 23, 2021

I'm wondering if it's possible to consume Client in copy_both_simple and add an explicit method like CopyBothDuplex::close() which returns Client back. That way Rust (not the user) guarantees that it's impossible to interleave CopyBoth mode with usual queries. This is much more restrictive than non-mutable reference, but I think it's for a good reason.

Moreover, it will be possible to actually poll all messages from the server until the CopyDone/CommandComplete/etc message is received in the close() method before returning to the usual query mode. I'm not sure, but is seems that it's currently possible to drop CopyBothDuplex and then enter a race between next client.simple_query() call and CopyBothReceiver's termination.

Looks like a small change in this PR. Any downsides?

@petrosagg
Copy link
Contributor Author

@yeputons this design was implemented in a previous version of this work by @jeff-davis. Note that even with this PR it is still impossible to interleave CopyBoth mode with normal queries. If you attempt to send a query while you're holding onto a CopyDuplex client, these queries are going to be queued internally and not interfere with the COPY BOTH operation at the protocol level. So at least that hazard is taken care of.

You could maybe argue that by not statically enforcing it a user could end up in a deadlock situation if they send a query and await it while the copy both client is not yet dropped. However, given how much work has been put already in this PR to make it as small a change as possible to get it merged I'm inclined it to leave it as is.

@petrosagg
Copy link
Contributor Author

@sfackler thanks for updating the CI image to pick up configuration automatically! I rebased this PR on top of master, it would be amazing if you could find some time to review it

@Martichou
Copy link

@sfackler any news on this ? I'm relying on this PR for a project and I think a lot of people would benefits from this.
It would be great (and kind) to review the work that @petrosagg and @jeff-davis did on rust-postgres.

@benesch
Copy link
Contributor

benesch commented Mar 17, 2022

@Martichou, you're welcome to use our fork (https://github.com/materializeInc/rust-postgres) for the moment! It's got this PR and #774 together, and we integrate new changes from rust-postgres periodically.

(We would, of course, love to get these PRs upstreamed, but @sfackler's time has seemed quite limited lately.)

@Martichou
Copy link

@benesch Thanks ! I was doing the same (maintaining a fork), but I'll use yours from now on ;)

jeff-davis and others added 2 commits January 13, 2023 13:19
Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
@manfredcml
Copy link

@sfackler It'd be great if you can review this PR by @petrosagg and @jeff-davis. Merging this will be helpful for those who are currently maintaining or using the forks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants