Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add psycopg3 as a new backend #586

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/test-suite.yml
Expand Up @@ -18,18 +18,18 @@ jobs:

services:
mysql:
image: mysql:5.7
image: mariadb:11
env:
MYSQL_USER: username
MYSQL_PASSWORD: password
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: testsuite
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
options: --health-cmd="mariadb-admin ping" --health-interval=10s --health-timeout=5s --health-retries=3

postgres:
image: postgres:14
image: postgres:16
env:
POSTGRES_USER: username
POSTGRES_PASSWORD: password
Expand Down Expand Up @@ -59,5 +59,6 @@ jobs:
mysql+asyncmy://username:password@localhost:3306/testsuite,
postgresql://username:password@localhost:5432/testsuite,
postgresql+aiopg://username:password@127.0.0.1:5432/testsuite,
postgresql+asyncpg://username:password@localhost:5432/testsuite
postgresql+asyncpg://username:password@localhost:5432/testsuite,
postgresql+psycopg://username:password@localhost:5432/testsuite
run: "scripts/test"
3 changes: 3 additions & 0 deletions README.md
Expand Up @@ -33,6 +33,7 @@ Database drivers supported are:

* [asyncpg][asyncpg]
* [aiopg][aiopg]
* [psycopg3][psycopg3]
* [aiomysql][aiomysql]
* [asyncmy][asyncmy]
* [aiosqlite][aiosqlite]
Expand All @@ -42,6 +43,7 @@ You can install the required database drivers with:
```shell
$ pip install databases[asyncpg]
$ pip install databases[aiopg]
$ pip install databases[psycopg3]
$ pip install databases[aiomysql]
$ pip install databases[asyncmy]
$ pip install databases[aiosqlite]
Expand Down Expand Up @@ -105,6 +107,7 @@ for examples of how to start using databases together with SQLAlchemy core expre
[pymysql]: https://github.com/PyMySQL/PyMySQL
[asyncpg]: https://github.com/MagicStack/asyncpg
[aiopg]: https://github.com/aio-libs/aiopg
[psycopg3]: https://github.com/psycopg/psycopg
[aiomysql]: https://github.com/aio-libs/aiomysql
[asyncmy]: https://github.com/long2ice/asyncmy
[aiosqlite]: https://github.com/omnilib/aiosqlite
Expand Down
1 change: 0 additions & 1 deletion databases/backends/aiopg.py
Expand Up @@ -7,7 +7,6 @@
import aiopg
from sqlalchemy.engine.cursor import CursorResultMetaData
from sqlalchemy.engine.interfaces import Dialect, ExecutionContext
from sqlalchemy.engine.row import Row
from sqlalchemy.sql import ClauseElement
from sqlalchemy.sql.ddl import DDLElement

Expand Down
16 changes: 8 additions & 8 deletions databases/backends/postgres.py → databases/backends/asyncpg.py
Expand Up @@ -19,7 +19,7 @@
logger = logging.getLogger("databases")


class PostgresBackend(DatabaseBackend):
class AsyncpgBackend(DatabaseBackend):
def __init__(
self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any
) -> None:
Expand Down Expand Up @@ -78,12 +78,12 @@ async def disconnect(self) -> None:
await self._pool.close()
self._pool = None

def connection(self) -> "PostgresConnection":
return PostgresConnection(self, self._dialect)
def connection(self) -> "AsyncpgConnection":
return AsyncpgConnection(self, self._dialect)


class PostgresConnection(ConnectionBackend):
def __init__(self, database: PostgresBackend, dialect: Dialect):
class AsyncpgConnection(ConnectionBackend):
def __init__(self, database: AsyncpgBackend, dialect: Dialect):
self._database = database
self._dialect = dialect
self._connection: typing.Optional[asyncpg.connection.Connection] = None
Expand Down Expand Up @@ -159,7 +159,7 @@ async def iterate(
yield Record(row, result_columns, self._dialect, column_maps)

def transaction(self) -> TransactionBackend:
return PostgresTransaction(connection=self)
return AsyncpgTransaction(connection=self)

def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
compiled = query.compile(
Expand Down Expand Up @@ -197,8 +197,8 @@ def raw_connection(self) -> asyncpg.connection.Connection:
return self._connection


class PostgresTransaction(TransactionBackend):
def __init__(self, connection: PostgresConnection):
class AsyncpgTransaction(TransactionBackend):
def __init__(self, connection: AsyncpgConnection):
self._connection = connection
self._transaction: typing.Optional[asyncpg.transaction.Transaction] = None

Expand Down
3 changes: 0 additions & 3 deletions databases/backends/common/records.py
@@ -1,12 +1,9 @@
import enum
import typing
from datetime import date, datetime, time

from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.engine.row import Row as SQLRow
from sqlalchemy.sql.compiler import _CompileLabel
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import JSON
from sqlalchemy.types import TypeEngine

from databases.interfaces import Record as RecordInterface
Expand Down
275 changes: 275 additions & 0 deletions databases/backends/psycopg.py
@@ -0,0 +1,275 @@
import typing

import psycopg
import psycopg.adapt
import psycopg.types
import psycopg_pool
from psycopg.rows import namedtuple_row
from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.sql import ClauseElement
from sqlalchemy.sql.schema import Column

from databases.backends.common.records import Record, create_column_maps
from databases.core import DatabaseURL
from databases.interfaces import (
ConnectionBackend,
DatabaseBackend,
Record as RecordInterface,
TransactionBackend,
)

try:
import orjson

def load(data):
return orjson.loads(data)

def dump(data):
return orjson.dumps(data)

except ImportError:
import json

def load(data):
return json.loads(data.decode("utf-8"))

def dump(data):
return json.dumps(data).encode("utf-8")


class JsonLoader(psycopg.adapt.Loader):
def load(self, data):
return load(data)


class JsonDumper(psycopg.adapt.Dumper):
def dump(self, data):
return dump(data)


class PsycopgBackend(DatabaseBackend):
_database_url: DatabaseURL
_options: typing.Dict[str, typing.Any]
_dialect: Dialect
_pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None

def __init__(
self,
database_url: typing.Union[DatabaseURL, str],
**options: typing.Dict[str, typing.Any],
) -> None:
self._database_url = DatabaseURL(database_url)
self._options = options
self._dialect = PGDialect_psycopg()
self._dialect.implicit_returning = True

async def connect(self) -> None:
if self._pool is not None:
return

url = self._database_url._url.replace("postgresql+psycopg", "postgresql")
self._pool = psycopg_pool.AsyncConnectionPool(url, open=False, **self._options)

# TODO: Add configurable timeouts
await self._pool.open()

async def disconnect(self) -> None:
if self._pool is None:
return

# TODO: Add configurable timeouts
await self._pool.close()
self._pool = None

def connection(self) -> "PsycopgConnection":
return PsycopgConnection(self, self._dialect)


class PsycopgConnection(ConnectionBackend):
_database: PsycopgBackend
_dialect: Dialect
_connection: typing.Optional[psycopg.AsyncConnection] = None

def __init__(self, database: PsycopgBackend, dialect: Dialect) -> None:
self._database = database
self._dialect = dialect

async def acquire(self) -> None:
if self._connection is not None:
return

if self._database._pool is None:
raise RuntimeError("PsycopgBackend is not running")

# TODO: Add configurable timeouts
connection = await self._database._pool.getconn()
connection.adapters.register_loader("json", JsonLoader)
connection.adapters.register_loader("jsonb", JsonLoader)
connection.adapters.register_dumper(dict, JsonDumper)
connection.adapters.register_dumper(list, JsonDumper)
await connection.set_autocommit(True)
self._connection = connection

async def release(self) -> None:
if self._connection is None:
return

await self._database._pool.putconn(self._connection)
self._connection = None

async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
if self._connection is None:
raise RuntimeError("Connection is not acquired")

query_str, args, result_columns = self._compile(query)

async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
await cursor.execute(query_str, args)
rows = await cursor.fetchall()

column_maps = create_column_maps(result_columns)
return [
PsycopgRecord(row, result_columns, self._dialect, column_maps)
for row in rows
]

async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
if self._connection is None:
raise RuntimeError("Connection is not acquired")

query_str, args, result_columns = self._compile(query)

async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
await cursor.execute(query_str, args)
row = await cursor.fetchone()

if row is None:
return None

return PsycopgRecord(
row,
result_columns,
self._dialect,
create_column_maps(result_columns),
)

async def fetch_val(
self, query: ClauseElement, column: typing.Any = 0
) -> typing.Any:
row = await self.fetch_one(query)
return None if row is None else row[column]

async def execute(self, query: ClauseElement) -> typing.Any:
if self._connection is None:
raise RuntimeError("Connection is not acquired")

query_str, args, _ = self._compile(query)

async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
await cursor.execute(query_str, args)

async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
# TODO: Find a way to use psycopg's executemany
for query in queries:
await self.execute(query)

async def iterate(
self, query: ClauseElement
) -> typing.AsyncGenerator[typing.Mapping, None]:
if self._connection is None:
raise RuntimeError("Connection is not acquired")

query_str, args, result_columns = self._compile(query)
column_maps = create_column_maps(result_columns)

async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
await cursor.execute(query_str, args)

while True:
row = await cursor.fetchone()

if row is None:
break

yield PsycopgRecord(row, result_columns, self._dialect, column_maps)

def transaction(self) -> "TransactionBackend":
return PsycopgTransaction(connection=self)

@property
def raw_connection(self) -> typing.Any:
if self._connection is None:
raise RuntimeError("Connection is not acquired")
return self._connection

def _compile(
self,
query: ClauseElement,
) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]:
compiled = query.compile(
dialect=self._dialect,
compile_kwargs={"render_postcompile": True},
)

compiled_query = compiled.string
params = compiled.params
result_map = compiled._result_columns

return compiled_query, params, result_map


class PsycopgTransaction(TransactionBackend):
_connecttion: PsycopgConnection
_transaction: typing.Optional[psycopg.AsyncTransaction]

def __init__(self, connection: PsycopgConnection):
self._connection = connection
self._transaction: typing.Optional[psycopg.AsyncTransaction] = None

async def start(
self, is_root: bool, extra_options: typing.Dict[typing.Any, typing.Any]
) -> None:
if self._connection._connection is None:
raise RuntimeError("Connection is not acquired")

transaction = psycopg.AsyncTransaction(
self._connection._connection, **extra_options
)
async with transaction._conn.lock:
await transaction._conn.wait(transaction._enter_gen())
self._transaction = transaction

async def commit(self) -> None:
if self._transaction is None:
raise RuntimeError("Transaction was not started")

async with self._transaction._conn.lock:
await self._transaction._conn.wait(self._transaction._commit_gen())

async def rollback(self) -> None:
if self._transaction is None:
raise RuntimeError("Transaction was not started")

async with self._transaction._conn.lock:
await self._transaction._conn.wait(self._transaction._rollback_gen(None))


class PsycopgRecord(Record):
@property
def _mapping(self) -> typing.Mapping:
return self._row._asdict()

def __getitem__(self, key: typing.Any) -> typing.Any:
if len(self._column_map) == 0:
if isinstance(key, str):
return self._mapping[key]
return self._row[key]
elif isinstance(key, Column):
idx, datatype = self._column_map_full[str(key)]
elif isinstance(key, int):
idx, datatype = self._column_map_int[key]
else:
idx, datatype = self._column_map[key]

return self._row[idx]