Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create example codes to demo how to use asyncpg in server and non server env by global connection pool #1111

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 67 additions & 0 deletions examples/global_pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

This example is about how to use asyncpg conveniently in both server environment and non-server environment. Users from pymongo will be comfortable for demo provided. The demo does not require to write data type schema like norm orm do. The process is adapted for non-async datadb users: first to get db handler, then table handler ,and then run code.

The demo code provide PG class,which save connection pool in global dictionary with database name as key. Each database has one and only one pool. Different pg instances can share same db pool.

# Non Server Environment
```build
from pg import PG

# define your data process logic here
async def run_many_sqls_in_transaction(table):
sqls = [
"sql1",
"sql2",
"sql3",
]
await table.trans(sqls)
async def run_one_sql(table):
sql="INSERT INTO ......"
await table.execute(sql)
async def select(table):
sql="select ......"
b = await table.select(sql)
print(11, b)

pgdb = PG("host", "port", "user", "password", "database")
table = pgdb['test']
table.run(run_many_sqls_in_transaction)
table.run(run_one_sql)
table.run(select)


```

In non-server environment, data process logic is written in async function. Run this function in PG class method "run" will guarantee connection pool will be terminated.

# Server Environment
```build
# example by fastapi

from pg import PG
from fastapi import FastAPI
from fastapi.testclient import TestClient

app = FastAPI()

# Do not need init connection pool here,pg class will init automatically when pool is none.
@app.on_event("startup")
async def startup_event():
pass

@app.on_event("shutdown")
async def shutdown_event():
pgdb = PG("host", "port", "user", "password", "database")
pgdb.terminate_pool()


@app.get("/xxx")
async def handlers():
pgdb = PG("host", "port", "user", "password", "database")
table = pgdb['test']
data= await table.select("select ....")
return data


```
The PG class will create pool with first call. The pool will be terminated during server lifetime shutdown phase.
75 changes: 75 additions & 0 deletions examples/global_pool/pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio

import asyncpg

DB = {}


class PG:
def __init__(self, host, port, user, pwd, dbName):
self.user = user
self.pwd = pwd
self.host = host
self.port = port
self.dbName = dbName
self.db = None
self.table = None

def __getitem__(self, tb):
self.table = tb
return self

async def db_pool(self):
global DB
if DB.get(self.dbName, None) is None:
DB[self.dbName] = await asyncpg.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.pwd,
database=self.dbName,
)

self.db = DB[self.dbName]
print("DB", self.dbName, self.db)
return self

# 如果测算程序不调用,切数据库会出现pgadmin看不到表格的问题.
def terminate_pool(self):
print("will terminate", self.db)
if self.db is not None:
self.db.terminate()
DB.pop(self.dbName, None)

async def check(self):
if self.dbName is None:
raise "no db name"
if self.table is None:
raise "no table name"
if self.db is None:
await self.db_pool()

async def execute(self, sql):
print("sql===", sql)
await self.check()
async with self.db.acquire() as conn:
await conn.execute(sql)

async def trans(self, sqls):
await self.check()
async with self.db.acquire() as conn:
async with conn.transaction():
for sql in sqls:
print("sql===", sql)
await conn.execute(sql)

async def select(self, sql):
await self.check()
async with self.db.acquire() as conn:
q = await conn.fetch(sql)
return [dict(i) for i in q]

def run(self, f):
print(self.dbName, self.table)
asyncio.get_event_loop().run_until_complete(f(self))
self.terminate_pool()