Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication support #71

Open
dvarrazzo opened this issue Aug 29, 2021 · 6 comments
Open

Replication support #71

dvarrazzo opened this issue Aug 29, 2021 · 6 comments
Labels
enhancement New feature or request

Comments

@dvarrazzo
Copy link
Member

Add support for streaming replication as in psycopg2

@dvarrazzo dvarrazzo created this issue from a note in psycopg3 first release (Later) Aug 29, 2021
@dvarrazzo dvarrazzo added the enhancement New feature or request label Aug 29, 2021
@ghost
Copy link

ghost commented Jul 15, 2022

Hi,

If this still an issue, I could take a look and use your guidance to implement this.

@dvarrazzo
Copy link
Member Author

If you would like to contribute to it you are welcome.

@dufferzafar
Copy link

dufferzafar commented Jun 12, 2023

@dvarrazzo What sort of support exists today to get logical replication working in an async mode?

I guess logical replication subscription as supported by psycopg2.extras - ReplicationCursor is not currently in psycopg3.

Can I still somehow use the SQL queries directly like (CREATE SLOT, START REPLICATION) to get changes as they happen?

asyncpg mentions this snippet of code in one of their issues regarding replication: MagicStack/asyncpg#91

async def poll_replication_stream(conn):
    while True:
        async for r in conn.fetch('SELECT * FROM pg_logical_slot_get_changes(...)'):
            await put_to_queue(r)
        await asyncio.sleep(polling_interval)

...

conn = await asyncpg.connect(...)
loop.create_task(poll_replication_stream(conn))

I'm assuming something like this will work with psycopg3 as well?

@dufferzafar
Copy link

@dvarrazzo Could you please also explain exactly what would need to be added to support replication directly in psycopg3?

I'm taking a look at the code right now & having some pointers on exactly what needs to be done would be great!

@dvarrazzo
Copy link
Member Author

Replication support is not implemented in psycopg 3. It needs complete redesign and implementation. If you want to contribute its implementation you are welcome, but it is a major work.

@dufferzafar
Copy link

Yeah, I realised that after looking at the code of pg2 & pg3. I guess it'll require a lot of interfacing with pq as well?

My current plan is to use something like wal2json extension on the server, then create a slot and get_changes in the JSON format directly from the server.

Or perhaps I won't do this in async mode and just use pg2 directly. I found pypgoutput which does client side decoding of pgoutput format to JSON: https://github.com/dgea005/pypgoutput

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

No branches or pull requests

2 participants