Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline mode (v2) #116

Closed
wants to merge 15 commits into from
Closed

pipeline mode (v2) #116

wants to merge 15 commits into from

Conversation

dlax
Copy link
Contributor

@dlax dlax commented Oct 15, 2021

Second attempt to implement the pipeline mode (#74), replacing #93. This is on top of PR #120 (first 5 commits).
Still suggest to read commit by commit, though the last one is large.

The core algorithm implements what's described at https://www.postgresql.org/docs/14/libpq-pipeline-mode.html#LIBPQ-PIPELINE-INTERLEAVE on execute() steps. In fetch*() steps during the pipeline, we might force to flush the output buffer with PQsendFlushRequest(); this also happens at the end of pipeline.

This is tested with a similar strategy as in postgres test at https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/test/modules/libpq_pipeline/libpq_pipeline.c;h=c27c4e0adaf052749574681dc4d2c50ccab9b08b;hb=refs/tags/REL_14_0#l632

Automatic prepared statement is handled now (in contrast with #93), thanks to some refactoring in PrepareManager.

The "lock" idea suggested at #93 (comment) and #93 (comment) (require a fetch*() call before a subsequent execute() call on a cursor) is not implemented. In fact, I'm not quite sure it's needed or, more specifically, not sure why it'd be needed in pipeline mode but not in normal mode... To be discussed (as everything else).

@dlax
Copy link
Contributor Author

dlax commented Oct 18, 2021

Rebased on #120 and (hopefully) fixed the automatic prepared statement mode.
(Ready to review now.)

@dlax dlax marked this pull request as ready for review October 18, 2021 13:27
@dvarrazzo dvarrazzo added this to the 3.1 milestone Nov 10, 2021
pgconn.consume_input()

# Consume notifies
while 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you copied this from my code. while 1 is a pattern I can't take off my muscle memory...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :) /me trying to be consistent with existing style... I wouldn't mind if we'd change all these!

Copy link
Member

@dvarrazzo dvarrazzo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dlax, amazing work!

I've gone through the patches step by step, I've just highlighted small things along the way. Now I will check out the branch to see the code all together, follow the flow, play a little bit with it.

Thank you very much!

psycopg/psycopg/generators.py Outdated Show resolved Hide resolved


@pytest.mark.slow
def test_pipeline_communicate(pgconn, demo_pipeline, generators):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these, in this form, really useful as unit tests, or wouldn't they be better as a sort of demonstration scripts, of the types you can find in tests/scripts/?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. These were necessary for me to come up with a working algorithm but I've now moved them as scripts.
Incidentally, running python tests/scripts/pipeline-demo.py -n 1 [--async] produces a deadlock, so there seem to be a problem in the high level API (no problem with the generator AFAICT): to be investigated...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, running python tests/scripts/pipeline-demo.py -n 1 [--async] produces a deadlock, so there seem to be a problem in the high level API

Strangely, the error is gone now after rebase. Perhaps because the waiting function (wait_async()) changed in the meantime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, now it fails with python tests/scripts/pipeline-demo.py -n10 [--async]; investigating...

psycopg/psycopg/_preparing.py Outdated Show resolved Hide resolved
psycopg/psycopg/_preparing.py Outdated Show resolved Hide resolved
for res in results:
if res.status not in self._status_ok:
self._raise_from_results(results)
if not self._pgconn.pipeline_status:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can set a results = () in the if pipeline_mode branch above to avoid this if here. Not for some mircro-optimisation but because this method is pretty complicated.

I still don't have a holistic view over the pipeline changes so I don't know if this will be changed later in the changeset I haven't seen yet, or if it can be simplified somehow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ah just noticed this is the last changeset...)

tests/test_pipeline_async.py Show resolved Hide resolved
tests/test_pipeline_async.py Show resolved Hide resolved
psycopg/psycopg/connection.py Outdated Show resolved Hide resolved
@dvarrazzo
Copy link
Member

I have rebased your branch. I won't force-push as it feels impolite: you can fetch it back it from the pipeine2 branch in our repo.

@dlax
Copy link
Contributor Author

dlax commented Nov 12, 2021

I have rebased your branch. I won't force-push as it feels impolite: you can fetch it back it from the pipeine2 branch in our repo.

I'm not sure how we should proceed then:

@dvarrazzo
Copy link
Member

I have rebased your branch. I won't force-push as it feels impolite: you can fetch it back it from the pipeine2 branch in our repo.

I'm not sure how we should proceed then:

* what to do with [support RW ready in waiting functions #120](https://github.com/psycopg/psycopg/pull/120) (see my last comment [support RW ready in waiting functions #120 (comment)](https://github.com/psycopg/psycopg/pull/120#issuecomment-966156078))?

* how should I account for review change request? add more commits here?

Rebasing on master seems independent from the decision about what to do. You can git reset --hard upstream/pipeline2 to have the conflicts resolved and then we can think about it.

Or am I missing something?

@dlax
Copy link
Contributor Author

dlax commented Nov 12, 2021

Yes, I'm fine with rebasing, it's done for both PR now.
It's just that I find it unhandy to work with two dependent PR at the same time...

@dvarrazzo
Copy link
Member

It's just that I find it unhandy to work with two dependent PR at the same time...

Yes, I understand. I apologise for having left open for so long and hadn't realised until yesterday you were working on the anyio front too.

@dlax dlax force-pushed the pipeline2 branch 3 times, most recently from be4996b to fea4673 Compare November 18, 2021 08:26
@dlax
Copy link
Contributor Author

dlax commented Nov 18, 2021

Hi @dvarrazzo, I have rebase this PR on your "pipeline-over-prepare" branch and included back the last commit (change from review) as it was missing. Ready to go as far as I can tell!

@dvarrazzo
Copy link
Member

Hello @dlax

Playing a bit with the current state of the pipeline. There is this thing:

def test_error(conn):
    conn.autocommit = True
    with conn.pipeline() as pipeline:
        conn.execute("select error")
        conn.execute("select 1").fetchone()

this test fails on the fetchone, with "pipeline aborted". I can see that we do receive the error, we associate it to the cursor, but, because we never fetch, we never execute check_result.

If there is no fetch at all, errors are pretty much swallowed and there is no report that something failed, and what. This test passes with no exception but no table is created.

def test_error(conn):
    conn.autocommit = True
    with conn.pipeline() as pipeline:
        conn.execute("select error")
        conn.execute("create table voila ()")

This test instead blocks:

def test_error(conn):
    with conn.transaction():
        with conn.pipeline() as pipeline:
            conn.execute("select error")
            conn.execute("create table voila ()")

I can see, on the wire, that COMMIT is send no result is received.

I wonder if all these artifacts are related and how we can deal with them better, especially the no silent failures.

dlax and others added 3 commits November 29, 2021 12:03
This generator will be used as the "send" operation during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state.

Added test is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. These will be
displayed upon failure by pytest but can be seen otherwise with
--log-file=<path> command-line argument.

Typically, when running:

    pytest tests/test_pipeline.py::test_pipeline_communicate --log-file=log

the 'log' file would contain (prefixes stripped):

    sent begin: BEGIN TRANSACTION
    sent drop table: DROP TABLE IF EXISTS pq_pipeline_demo
    sent create table: CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
    sent prepare: INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2)
    sent row 9999
    sent row 9998
    sent row 9997
    sent row 9996
    sent row 9995
    sent row 9994
    sent row 9993
    sent row 9992
    sent row 9991
    ...
    sent row 9690
    sent row 9689
    sent row 9688
    got COMMAND_OK results for 'begin' command
    got COMMAND_OK results for 'drop table' command
    got COMMAND_OK results for 'create table' command
    got COMMAND_OK results for 'prepare' command
    got COMMAND_OK results for 'insert 9999' command
    got COMMAND_OK results for 'insert 9998' command
    got COMMAND_OK results for 'insert 9997' command
    got COMMAND_OK results for 'insert 9996' command
    got COMMAND_OK results for 'insert 9995' command
    ...
    got COMMAND_OK results for 'insert 9691' command
    got COMMAND_OK results for 'insert 9690' command
    sent row 9687
    sent row 9686
    sent row 9685
    sent row 9684
    sent row 9683
    ...
    sent row 2
    sent row 1
    sent row 0
    sent COMMIT
    pipeline sync sent
    got COMMAND_OK results for 'insert 236' command
    got COMMAND_OK results for 'insert 235' command
    got COMMAND_OK results for 'insert 234' command
    got COMMAND_OK results for 'insert 233' command
    ...
    got COMMAND_OK results for 'insert 0' command
    got COMMAND_OK results for 'commit' command
    got PIPELINE_SYNC results for 'pipeline sync' command

We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.
In tests, we check pipeline results with PGconn interface until we
implement result processing in the Python API.
As suggested at [1], stream() and pipeline mode are different use cases.

[1]: psycopg#93 (comment)
dlax and others added 12 commits November 29, 2021 12:03
In error_from_result(), "state" may be the empty string if
result.error_field() returns None; this value is then passed to
_class_for_state() and then, possibly, to get_base_exception() which
would fail with 'IndexError: string index out of range'.
These results, in contrast with other ones, are not separated by a NULL
pointer while repeatedly calling PQgetResult(). We return them
separately in order to be able to process them similarly as other
queries/commands results.
In BaseCursor._check_result(), we now handle FATAL_ERROR and (the
pipeline-mode specific error) PIPELINE_ABORTED in preparation for
supporting the pipeline mode where we'd check results upon fetch*()
instead of upon execute() currently.

Similarly, BaseCursor._raise_from_results() handles PIPELINE_ABORTED
result status.
BaseConnection gets a _pipeline_queue attribute which stores identifiers
of requests sent in the pipeline pending results fetch (and processing).
For queries, where data is expected as results, we store a tuple with
respective cursor and query information needed to maintain automatic
prepared statement. For commands, we store None.

Everywhere we run the execute() generator in non-pipeline mode, we now
simply enqueue an item in the pipeline queue. Then we run
pipeline_communicate(), through the _pipeline_communicate_gen() method
of BaseConnection, in BaseCursor._execute(many)_gen().

Since pipeline_communicate() may not fetch all results, we need a
dedicated fetch (forced) step upon call to cursor.fetch*(); this is done
by Cursor._fetch_pipeline() called in fetch*() methods. This usually
calls PQsendFlushRequest() in order to avoid blocking on PQgetResult().

Similarly, at exit of pipeline mode, we force results fetch.

Results check is performed at fetch*() call instead of execute() in
normal mode; hence the conditional results check in _execute_results().

In tests, we add test_pipeline_demo() which reproduces the PostgreSQL
test introduced earlier for pipeline_communicate() generator but only
uses psycopg API without any explicit results fetch.

Other tests are unit tests checking that specific features (e.g.
transactions or prepared statement) work well with the pipeline mode.
Changes to the prepared statements were extracted in a separate branch
and completed, then the pipeline changes were added on top of them. This
changeset reconciles the two.
The script logs to stderr, or specified log file, it can run both the
sync and async pipeline demo and the number of rows to insert can be
specified.
Moving the fixture there as well, thus dropping fix_pipeline.py.
The script can now run the pipeline demo with psycopg high level API as
well as with 'psycopg.pq' (low-level) API.
These were useful while developing the feature but less now.
@dlax
Copy link
Contributor Author

dlax commented Dec 2, 2021

@dvarrazzo, I've been working back on this following your last message and found several new problems :)

First, there is an issue with transaction code and pipeline mode because transaction code emits several commands in a single query (separated by ;) and this is disallowed in pipeline mode (raises a syntax error on postgres side). So I think this should be removed first. (Done on my side.)

Then, there's is indeed something wrong while we process results at the end of the pipeline which sometimes blocks execution as you noticed in your last message. Still investigating on the issue.

Finally, I've reworked the pipeline_communicate() generator in order to avoid sending commands when the socket is not write-ready by passing it a queue of callable (equivalent to PQsend*() calls). This is part is working, but unfortunately does not resolve the blocking effect mentioned before.

All in all, I think we'll be good for a v3. To be continued...

@dvarrazzo
Copy link
Member

Thank you very much @dlax! Very appreciated :)

@dlax dlax mentioned this pull request Dec 8, 2021
@dlax
Copy link
Contributor Author

dlax commented Dec 8, 2021

Replaced by #175

@dlax dlax closed this Dec 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants