Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use anyio in "core" psycopg #146

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 40 additions & 36 deletions .github/workflows/tests.yml
Expand Up @@ -26,28 +26,29 @@ jobs:
matrix:
include:
# Test different combinations of Python, Postgres, libpq.
- {impl: python, python: "3.7", postgres: "postgres:10", libpq: newest}
- {impl: python, python: "3.8", postgres: "postgres:12"}
- {impl: python, python: "3.9", postgres: "postgres:13"}
- {impl: python, python: "3.10", postgres: "postgres:14"}
- {impl: python, python: "3.11", postgres: "postgres:15", libpq: oldest}
- {impl: python, python: "3.7", postgres: "postgres:10", libpq: newest, pytest_opts: "--anyio=asyncio"}
- {impl: python, python: "3.8", postgres: "postgres:12", pytest_opts: ""}
- {impl: python, python: "3.9", postgres: "postgres:13", pytest_opts: "--anyio=trio"}
- {impl: python, python: "3.10", postgres: "postgres:14", pytest_opts: ""}
- {impl: python, python: "3.11", postgres: "postgres:15", libpq: oldest, pytest_opts: ""}

- {impl: c, python: "3.7", postgres: "postgres:15", libpq: newest}
- {impl: c, python: "3.8", postgres: "postgres:13"}
- {impl: c, python: "3.9", postgres: "postgres:14"}
- {impl: c, python: "3.10", postgres: "postgres:11", libpq: oldest}
- {impl: c, python: "3.11", postgres: "postgres:10", libpq: newest}
- {impl: c, python: "3.7", postgres: "postgres:15", libpq: newest, pytest_opts: ""}
- {impl: c, python: "3.8", postgres: "postgres:13", pytest_opts: "--anyio=asyncio"}
- {impl: c, python: "3.9", postgres: "postgres:14", pytest_opts: ""}
- {impl: c, python: "3.10", postgres: "postgres:11", libpq: oldest, pytest_opts: "--anyio=trio"}
- {impl: c, python: "3.11", postgres: "postgres:10", libpq: newest, pytest_opts: ""}

- {impl: python, python: "3.9", ext: dns, postgres: "postgres:14"}
- {impl: python, python: "3.9", ext: postgis, postgres: "postgis/postgis"}
- {impl: python, python: "3.9", ext: dns, postgres: "postgres:14", pytest_opts: ""}
- {impl: python, python: "3.9", ext: postgis, postgres: "postgis/postgis", pytest_opts: ""}

# Test with minimum dependencies versions
- {impl: c, python: "3.7", ext: min, postgres: "postgres:15"}
- {impl: c, python: "3.7", ext: min, postgres: "postgres:15", pytest_opts: ""}

env:
PSYCOPG_IMPL: ${{ matrix.impl }}
DEPS: ./psycopg[test] ./psycopg_pool
PSYCOPG_TEST_DSN: "host=127.0.0.1 user=postgres password=password"
PYTEST_ADDOPTS: ${{ matrix.pytest_opts }}
MARKERS: ""

steps:
Expand Down Expand Up @@ -77,7 +78,7 @@ jobs:
- name: Include dnspython to the packages to install
if: ${{ matrix.ext == 'dns' }}
run: |
echo "DEPS=$DEPS dnspython" >> $GITHUB_ENV
echo "DEPS=$DEPS anyio[trio] dnspython" >> $GITHUB_ENV
echo "MARKERS=$MARKERS dns" >> $GITHUB_ENV

- name: Include shapely to the packages to install
Expand Down Expand Up @@ -110,21 +111,22 @@ jobs:
fail-fast: false
matrix:
include:
- {impl: python, python: "3.7"}
- {impl: python, python: "3.8"}
- {impl: python, python: "3.9"}
- {impl: python, python: "3.10"}
- {impl: python, python: "3.11"}
- {impl: c, python: "3.7"}
- {impl: c, python: "3.8"}
- {impl: c, python: "3.9"}
- {impl: c, python: "3.10"}
- {impl: c, python: "3.11"}
- {impl: python, python: "3.7", pytest_opts: ""}
- {impl: python, python: "3.8", pytest_opts: "--anyio=asyncio"}
- {impl: python, python: "3.9", pytest_opts: ""}
- {impl: python, python: "3.10", pytest_opts: "--anyio=trio"}
- {impl: python, python: "3.11", pytest_opts: ""}
- {impl: c, python: "3.7", pytest_opts: "--anyio=trio"}
- {impl: c, python: "3.8", pytest_opts: ""}
- {impl: c, python: "3.9", pytest_opts: "--anyio=asyncio"}
- {impl: c, python: "3.10", pytest_opts: ""}
- {impl: c, python: "3.11", pytest_opts: ""}

env:
PSYCOPG_IMPL: ${{ matrix.impl }}
DEPS: ./psycopg[test] ./psycopg_pool
PSYCOPG_TEST_DSN: "host=127.0.0.1 user=runner dbname=postgres"
PYTEST_ADDOPTS: ${{ matrix.pytest_opts }}
# MacOS on GitHub Actions seems particularly slow.
# Don't run timing-based tests as they regularly fail.
# pproxy-based tests fail too, with the proxy not coming up in 2s.
Expand Down Expand Up @@ -165,21 +167,22 @@ jobs:
fail-fast: false
matrix:
include:
- {impl: python, python: "3.7"}
- {impl: python, python: "3.8"}
- {impl: python, python: "3.9"}
- {impl: python, python: "3.10"}
- {impl: python, python: "3.11"}
- {impl: c, python: "3.7"}
- {impl: c, python: "3.8"}
- {impl: c, python: "3.9"}
- {impl: c, python: "3.10"}
- {impl: c, python: "3.11"}
- {impl: python, python: "3.7", pytest_opts: "--anyio=asyncio"}
- {impl: python, python: "3.8", pytest_opts: ""}
- {impl: python, python: "3.9", pytest_opts: "--anyio=trio"}
- {impl: python, python: "3.10", pytest_opts: ""}
- {impl: python, python: "3.11", pytest_opts: ""}
- {impl: c, python: "3.7", pytest_opts: ""}
- {impl: c, python: "3.8", pytest_opts: "--anyio=trio"}
- {impl: c, python: "3.9", pytest_opts: ""}
- {impl: c, python: "3.10", pytest_opts: "--anyio=asyncio"}
- {impl: c, python: "3.11", pytest_opts: ""}

env:
PSYCOPG_IMPL: ${{ matrix.impl }}
DEPS: ./psycopg[test] ./psycopg_pool
PSYCOPG_TEST_DSN: "host=127.0.0.1 dbname=postgres"
PYTEST_ADDOPTS: ${{ matrix.pytest_opts }}
# On windows pproxy doesn't seem very happy. Also a few timing test fail.
NOT_MARKERS: "timing proxy mypy"

Expand Down Expand Up @@ -235,12 +238,13 @@ jobs:
fail-fast: false
matrix:
include:
- {impl: c, crdb: "latest-v22.1", python: "3.10", libpq: newest}
- {impl: python, crdb: "latest-v22.2", python: "3.11"}
- {impl: c, crdb: "latest-v22.1", python: "3.10", libpq: newest, pytest_opts: "--anyio=trio"}
- {impl: python, crdb: "latest-v22.2", python: "3.11", pytest_opts: ""}
env:
PSYCOPG_IMPL: ${{ matrix.impl }}
DEPS: ./psycopg[test] ./psycopg_pool
PSYCOPG_TEST_DSN: "host=127.0.0.1 port=26257 user=root dbname=defaultdb"
PYTEST_ADDOPTS: ${{ matrix.pytest_opts }}

steps:
- uses: actions/checkout@v3
Expand Down
15 changes: 13 additions & 2 deletions docs/api/connections.rst
Expand Up @@ -401,8 +401,8 @@ The `!Connection` class
.. _pg_prepared_xacts: https://www.postgresql.org/docs/current/static/view-pg-prepared-xacts.html


The `!AsyncConnection` class
----------------------------
The `!AsyncConnection` classes
------------------------------

.. autoclass:: AsyncConnection()

Expand Down Expand Up @@ -487,3 +487,14 @@ The `!AsyncConnection` class
.. automethod:: tpc_commit
.. automethod:: tpc_rollback
.. automethod:: tpc_recover


.. autoclass:: AnyIOConnection()

This is class is similar to `AsyncConnection` but uses anyio_ as an
asynchronous library instead of `asyncio`.

To use this class, run ``pip install "psycopg[anyio]"`` to install
required dependencies.

.. _anyio: https://anyio.readthedocs.io/
2 changes: 2 additions & 0 deletions psycopg/psycopg/__init__.py
Expand Up @@ -25,6 +25,7 @@
from .server_cursor import AsyncServerCursor, ServerCursor
from .client_cursor import AsyncClientCursor, ClientCursor
from .connection_async import AsyncConnection
from ._anyio.connection import AnyIOConnection

from . import dbapi20
from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING
Expand Down Expand Up @@ -59,6 +60,7 @@
# this is the canonical place to obtain them and should be used by MyPy too,
# so that function signatures are consistent with the documentation.
__all__ = [
"AnyIOConnection",
"AsyncClientCursor",
"AsyncConnection",
"AsyncCopy",
Expand Down
Empty file.
80 changes: 80 additions & 0 deletions psycopg/psycopg/_anyio/connection.py
@@ -0,0 +1,80 @@
"""
psycopg async connection objects using AnyIO
"""

# Copyright (C) 2022 The Psycopg Team


from functools import lru_cache
from typing import Any, Optional, TYPE_CHECKING

from .. import errors as e
from ..abc import PQGen, PQGenConn, RV
from ..connection_async import AsyncConnection
from ..rows import Row

if TYPE_CHECKING:
import anyio
import sniffio
from . import waiting
else:
anyio = sniffio = waiting = None


@lru_cache()
def _import_anyio() -> None:
global anyio, sniffio, waiting
try:
import anyio
import sniffio
from . import waiting
except ImportError as e:
raise ImportError(
"anyio is not installed; run `pip install psycopg[anyio]`"
) from e


class AnyIOConnection(AsyncConnection[Row]):
"""
Asynchronous wrapper for a connection to the database using AnyIO
asynchronous library.
"""

__module__ = "psycopg"

def __init__(self, *args: Any, **kwargs: Any) -> None:
_import_anyio()
self._lockcls = anyio.Lock # type: ignore[assignment]
super().__init__(*args, **kwargs)

@staticmethod
def _async_library() -> str:
_import_anyio()
return sniffio.current_async_library()

@staticmethod
def _getaddrinfo() -> Any:
_import_anyio()
return anyio.getaddrinfo

async def wait(self, gen: PQGen[RV]) -> RV:
try:
return await waiting.wait(gen, self.pgconn.socket)
except KeyboardInterrupt:
# TODO: this doesn't seem to work as it does for sync connections
# see tests/test_concurrency_async.py::test_ctrl_c
# In the test, the code doesn't reach this branch.

# On Ctrl-C, try to cancel the query in the server, otherwise
# otherwise the connection will be stuck in ACTIVE state
c = self.pgconn.get_cancel()
c.cancel()
try:
await waiting.wait(gen, self.pgconn.socket)
except e.QueryCanceled:
pass # as expected
raise

@classmethod
async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
return await waiting.wait_conn(gen, timeout)
140 changes: 140 additions & 0 deletions psycopg/psycopg/_anyio/waiting.py
@@ -0,0 +1,140 @@
"""
Async waiting functions using AnyIO.
"""

# Copyright (C) 2022 The Psycopg Team


import socket
from typing import Optional

import anyio

from .. import errors as e
from ..abc import PQGen, PQGenConn, RV
from ..waiting import Ready, Wait


def _fromfd(fileno: int) -> socket.socket:
# AnyIO's wait_socket_readable() and wait_socket_writable() functions work
# with socket object (despite the underlying async libraries -- asyncio and
# trio -- accept integer 'fileno' values):
# https://github.com/agronholm/anyio/issues/386
try:
return socket.fromfd(fileno, socket.AF_INET, socket.SOCK_STREAM)
except OSError as exc:
raise e.OperationalError(
f"failed to build a socket from connection file descriptor: {exc}"
)


async def wait(gen: PQGen[RV], fileno: int) -> RV:
"""
Coroutine waiting for a generator to complete.

:param gen: a generator performing database operations and yielding
`Ready` values when it would block.
:param fileno: the file descriptor to wait on.
:return: whatever *gen* returns on completion.

Behave like in `waiting.wait()`, but exposing an `anyio` interface.
"""
s: Wait
ready: Ready
sock = _fromfd(fileno)

async def readable(ev: anyio.Event) -> None:
await anyio.wait_socket_readable(sock)
nonlocal ready
ready |= Ready.R # type: ignore[assignment]
ev.set()

async def writable(ev: anyio.Event) -> None:
await anyio.wait_socket_writable(sock)
nonlocal ready
ready |= Ready.W # type: ignore[assignment]
ev.set()

try:
s = next(gen)
while True:
reader = s & Wait.R
writer = s & Wait.W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev = anyio.Event()
ready = 0 # type: ignore[assignment]
async with anyio.create_task_group() as tg:
if reader:
tg.start_soon(readable, ev)
if writer:
tg.start_soon(writable, ev)
await ev.wait()
tg.cancel_scope.cancel() # Move on upon first task done.

s = gen.send(ready)

except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv

finally:
sock.close()


async def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
"""
Coroutine waiting for a connection generator to complete.

:param gen: a generator performing database operations and yielding
(fd, `Ready`) pairs when it would block.
:param timeout: timeout (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C. If zero or None, wait indefinitely.
:return: whatever *gen* returns on completion.

Behave like in `waiting.wait()`, but take the fileno to wait from the
generator itself, which might change during processing.
"""
s: Wait
ready: Ready

async def readable(sock: socket.socket, ev: anyio.Event) -> None:
await anyio.wait_socket_readable(sock)
nonlocal ready
ready = Ready.R
ev.set()

async def writable(sock: socket.socket, ev: anyio.Event) -> None:
await anyio.wait_socket_writable(sock)
nonlocal ready
ready = Ready.W
ev.set()

timeout = timeout or None
try:
fileno, s = next(gen)

while True:
reader = s & Wait.R
writer = s & Wait.W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev = anyio.Event()
ready = 0 # type: ignore[assignment]
with _fromfd(fileno) as sock:
async with anyio.create_task_group() as tg:
if reader:
tg.start_soon(readable, sock, ev)
if writer:
tg.start_soon(writable, sock, ev)
with anyio.fail_after(timeout):
await ev.wait()

fileno, s = gen.send(ready)

except TimeoutError:
raise e.OperationalError("timeout expired")

except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv