-
-
Notifications
You must be signed in to change notification settings - Fork 6.1k
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQLAlchemy Dependency vs. Middleware vs. scoped_session #726
Comments
I'm not sure about the reasoning behind the doc's reccomendations, but the reason why I'd personally advise against making SQLAlchemy's own doc says I personally prefer handling my connections this way: from typing import Optional, AsyncIterable
from fastapi import Depends, FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.engine import Engine as Database
app = FastAPI()
_db_conn: Optional[Database]
@app.on_event("startup")
def open_database_connection_pools():
global _db_conn
_db_conn = create_engine(...)
@app.on_event("shutdown")
def close_database_connection_pools():
global _db_conn
if _db_conn: _db_conn.dispose()
async def get_db_conn() -> Database:
assert _db_conn is not None
return _db_conn
# This is the part that replaces sessionmaker
async def get_db_sess(db_conn = Depends(get_db_conn)) -> AsyncIterable[Session]:
sess = Session(bind=db_conn)
try:
yield sess
finally:
sess.close()
@app.get("/")
def foo(db_sess: Session = Depends(get_db_sess)):
pass The benefits of this approach are:
|
Thanks for the in depth for response @sm-Fifteen! Those do seem like some good benefits over using One clarification though: you initially mentioned how it's probably better not to have a global database connection, but then your solution is just passing around the connection as a global, no? You add a layer of indirection with the database connection dependency, but I'm not sure I see the benefit of that approach. |
Yeah, I should probably have explained that one a bit more. The important distinction is that while To go back to the example I was giving: _db_conn: Optional[Database]
def open_database_connection_pools():
global _db_conn
_db_conn = create_engine(...)
def close_database_connection_pools():
global _db_conn
if _db_conn: _db_conn.dispose()
async def get_db_conn() -> Database:
assert _db_conn is not None
return _db_conn ...these three functions should be the only places where This weird pattern with the I should specify that in SQLAlchemy's case specifically, this is probably not needed (unless you want to make use of |
How about this kind of pattern https://github.com/mfreeborn/fastapi-sqlalchemy similar to the way it works in flask? |
It's fine, I guess, but it limits you to a single database connection (I need to connect to at least 3 on one of the projects I'm developping) and it doesn't take advantage of dependency injection so you can't really make your routes or functions use a different DB object without modifying their code or the initialization sequence of your middleware. |
@acnebs: You might want to close this issue if your question has been answered. |
Note that there is now also a contextmanager-style approach to getting a session that may be preferable to using a middleware if not all endpoints rely on the database. This is documented here. |
I don't know if this is a good idea with database connection pools, the way I see it, it's probably preferable to open them once when starting the application and pulling connections from those as dependencies. There are valid use cases to context manager dependencies, but I'm not sure if this is one of those. |
The contextmanager approach is almost exactly equivalent to the middleware approach, so I'm not sure what you mean with regard to database connection pools. In particular, the connection pool is still only opened once when starting the application (as long as The only differences between the contextmanager approach and the middleware approach, as far as I'm aware, are that 1) the contextmanager approach doesn't clean up the session until after sending the response (so you need to handle commits and any associated errors inside your endpoint, which I would argue you should do anyway unless you want to be returning 500s to your clients), and 2) in the contextmanager approach, a session isn't automatically created for every request -- it is only created for endpoint calls that actually make use of the session dependency. If your point was just that you don't like the middleware approach either, that's fine (although I haven't seen any argument against the use of a global |
Oh, I agree that sessions should be handled with context manager dependencies, I meant that more for the underlying database connection pool, which should be instanciated once for the application and then reused (I realize I'm probably getting annoying with this talk of lifetime-scoped dependencies, but I think it's really really important, you know?), which neither the middleware approach nor the ctxmgr dependencies would be well suited for. |
@acnebs about your question, SQLAlchemy For example, Peewee doesn't have a way to not use global state for connections, so it requires several tricks with ContextVars (you can read more about it here in the docs: https://fastapi.tiangolo.com/advanced/sql-databases-peewee/). |
@tiangolo @sm-Fifteen Some very good points. Just wanted to make sure the reasoning behind things were clear. This is very helpful for my own (and hopefully others) understanding! |
@tiangolo could you perchance comment on @sm-Fifteen's discussion of not using a globally-instantiated |
Starlette now discourages these https://github.com/encode/starlette/blob/master/starlette/applications.py#L114-L115
@tiangolo Could you provide guidance on how to best manage DB lifecycle in consideration of all the various factors ( |
A few additional questions:
Putting it all together, we have something like this:
|
Yes, would appreciate some updated / enhanced guidelines in the documentation around this. Especially also in light of dmontagu/fastapi-utils#174 |
FWIW, I have tried this, and it doesn't work, for exactly the reason you're describing. I had several co-routines trying to start transactions on the same thread-local session and had to abandon this approach. |
EDIT: @tiangolo @sm-Fifteen It's true that the default behavior of The basic idea is that the My solution, which works perfectly fine for me, is to generate a UUID for every incoming request (I use a middleware for this) and store it in a ContextVar, then reset the ContextVar after the request has been processed. Then my scopefunc is just a function that returns the current value of the ContextVar. from contextvars import ContextVar
class Database:
def __init__(self):
self.engine: Optional[Engine] = None
self.session: Optional[scoped_session] = None
def connect(self, config: DBConfig, scopefunc: Callable[[], Hashable]) -> None:
url = sqlalchemy.engine.url.URL.create(
DIALECT,
username=config.username,
password=config.password,
host=config.hostname,
port=config.port,
database=config.dbname
)
self.engine = sqlalchemy.create_engine(url)
session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine)
self.session = sqlalchemy.orm.scoped_session(session_factory, scopefunc=scopefunc)
def get_session(self) -> Session:
if self.session is None:
raise RuntimeError('not connected to database')
return self.session()
def remove_session(self) -> None:
if self.session is None:
raise RuntimeError('not connected to database')
self.session.remove()
db = Database()
REQUEST_ID_CTX_KEY: Final[str] = 'request_id'
_request_id_ctx_var: ContextVar[Optional[str]] = ContextVar(REQUEST_ID_CTX_KEY, default=None)
def get_request_id() -> Optional[str]:
return _request_id_ctx_var.get()
api = FastAPI()
@api.on_event("startup")
def create_db() -> None:
config_path = os.environ.get('CONFIG_PATH')
db_config = config.load_config(config_path)
db.connect(db_config, session_scopefunc=get_request_id)
@api.middleware("http")
async def set_request_id(request: Request, call_next: Callable[[Request], Any]) -> None:
request_id = str(uuid1())
ctx_token = _request_id_ctx_var.set(request_id)
response = await call_next(request)
_request_id_ctx_var.reset(ctx_token)
return response
# this function is used as a dependency for any route or dependency that needs a DB session
def db_session() -> Session:
session = db.get_session()
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.commit()
db.remove_session() Hope others can find this helpful. Would probably make sense to include something about scopefunc and context vars in the documentation. |
I should add that I also have a cleanup method that is triggered on |
I also found this in the SQLAlchemy docs, regarding the new async support in SQLAlchemy 1.4. |
I tried @mrpgraae's approach but there is a flaw in it. Its happens after the request has been handled and session is being closed. The problem is that FastAPI would reset the context variable in the The solution would be to create the close the sessions in the middleware layer without involving dependencies. This would be similar to what the fastapi-sqlalchemy middleware library does. Which again boils down to @sm-Fifteen's assertion about global variables in ASGI, and why having the engine as a global variable is bad idea. |
Hey @kvelakur thanks for pointing this out. I just found out recently, that for dependencies that use I'm wondering how you discovered this issue? I have been running the above code in a live system for quite a while now without any apparent issues. Though I have noticed a very slowly but steadily increasing memory usage, which I guess might be caused by stale sessions piling up. |
@mrpgraae I tried the approach in prod and ran into "QueuePool limit of size x overflow y reached, connection timed out, timeout z (Background on this error at: https://sqlalche.me/e/14/3o7r)" |
As mentioned earlier, thread local sessions do not work well in FastAPI due to asyncio. By reading the documentation and using examples, I found a way to tie together the session of SQLAlchemy and the Request lifecycle of FastAPI. Principle: Using thread-local Scope with Web Applications in the description, After the Web request starts, you initialize the session and then you do something, and then you submit, and then the request ends, and then you remove the session. This is also a common usage pattern for Web applications. In Uvicorn, the underlying FastAPI, one if scopefunc:
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory) In Using asyncio scoped session in the solution is given. We can get a shared session for the current from asyncio import current_task
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.asyncio import AsyncSession
async_session_factory = sessionmaker(some_async_engine, class_=_AsyncSession)
AsyncSession = async_scoped_session(async_session_factory, scopefunc=current_task)
some_async_session = AsyncSession()
# ...
async def some_function(some_async_session, some_object):
# use the AsyncSession directly
some_async_session.add(some_object)
# use the AsyncSession via the context-local proxy
await AsyncSession.commit()
# "remove" the current proxied AsyncSession for the local context
await AsyncSession.remove() I have found that in FastAPI it is possible to pass So, I wrote a sample project that also supports AsyncSession, and you just need to replace part of the comment code. I also use FastAPI global dependencies to address the lifecycle management of sessions in context. Example: from asyncio import current_task
import uvicorn
from fastapi import APIRouter, Depends, FastAPI
from fastapi.requests import Request
from sqlalchemy import Boolean, Column, Integer, String, delete, select, create_engine
from sqlalchemy.ext.asyncio import (AsyncSession, async_scoped_session,
create_async_engine)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session, Session
Base = declarative_base()
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True)
hashed_password = Column(String(255))
is_active = Column(Boolean, default=True)
def __repr__(self):
return f"<User(id={self.id}, " \
f"email=\"{self.email}\", " \
f"hashed_password=\"{self.hashed_password}\", " \
f"is_active={self.is_active})>"
class Database:
def __init__(self, db_url) -> None:
# self._engine = create_async_engine(db_url, future=True, echo=True)
self._engine = create_engine(db_url, future=True, echo=True)
# self._sessionmaker = sessionmaker(self._engine, class_=AsyncSession, expire_on_commit=False)
self._sessionmaker = sessionmaker(self._engine, expire_on_commit=False)
# self._scoped_session = async_scoped_session(self._sessionmaker, scopefunc=current_task)
self._scoped_session = scoped_session(self._sessionmaker, scopefunc=current_task)
@property
# def async_scoped_session(self):
def scoped_session(self):
return self._scoped_session
@property
def session(self):
"""init current context-level session"""
return self._scoped_session()
# async def create_database(self) -> None:
# async with self._engine.begin() as conn:
# await conn.run_sync(Base.metadata.create_all)
def create_database(self) -> None:
Base.metadata.create_all(self._engine)
class BaseService:
MODEL: Base
# def __init__(self, session: AsyncSession) -> None:
def __init__(self, session: Session) -> None:
self._session = session
@property
# def session(self) -> AsyncSession:
def session(self) -> Session:
return self._session
# async def get_all(self):
# result = await self.session.execute(select(self.MODEL))
# return result.scalars().all()
def get_all(self):
result = self.session.execute(select(self.MODEL))
return result.scalars().all()
class UserService(BaseService):
MODEL = UserModel
class Container:
# db = Database('sqlite+aiosqlite:////tmp/blog.db')
db = Database('sqlite:////tmp/blog.db')
@property
def user_service(self):
return UserService(self.db.session)
router = APIRouter()
async def user_service_dependency(request: Request):
container = request.app.extra.get('container')
yield container.user_service
@router.get("/users")
async def get_list(
user_service: UserService = Depends(user_service_dependency),
):
# return await user_service.get_all()
return user_service.get_all()
async def scoping_session(request: Request):
container = request.app.extra.get('container')
# async_session = container.db.async_session
async_session = container.db.scoped_session
yield
# await async_session.remove()
async_session.remove()
def create_app() -> FastAPI:
container = Container()
_app = FastAPI(
dependencies=[Depends(scoping_session), ],
container=container,
)
_app.add_event_handler('startup', container.db.create_database)
_app.include_router(router)
return _app
app = create_app() If you have any questions, welcome to discuss. |
@whg517 As your app scales you'll need to wire-up every db-dependent service you build as a property in the Container instance as well as create a service_dependency. In addition to that, you have to use FastAPI's built-in dependency injection to inject the service_dependency functions or create some kind of auto-wiring for them. I think the solution you've outlined works better than trying to commit and close SQLAlchemy AsyncSessions with dependency injecting get_db_session that use a try / except / finally handling session commit and close. However, I think it might be cumbersome as a pattern for larger apps. I couldn't find a good solution to this. I ran into issues with creating something like a stateful per-request Session instance within FastAPI's dependency injection framework. I couldn't get all my dependencies to use the same session and have it wire up session commits and rollbacks effectively. The documentation has been updated to recommend a different approach vs when I last checked: https://fastapi.tiangolo.com/advanced/async-sql-databases/ It's now using encode/databases for async. I don't think I can use this if I want ORM compatibility, however. ... I'm relatively new to python so forgive me if some of this is inaccurate. I'm just trying to get all of this straight in my head. The other alternative is creating and handling session begin commit / end inside each dependency that uses it as well as inside the routes. That's a lot of added code just to manage an abstraction that I don't really need and probably don't want. EDIT: I had some incorrect information here about session alternatives that I've removed. The complexity of SQLAlchemy's documentation and the confusion around using Sessions with FastAPI makes me think that I will have a lot of difficulty onboarding new developers to projects using Sessions and their ORM. |
I've been trying to go through all the related issues #104 #290 etc., and come up with a summary of what the best production worthy steps might be as of 2022. It looks like, the following apply:
@sm-Fifteen , @dmontagu could you please weigh in with your thoughts? If this is a useful summary, it might be a good idea to write this in the docs. |
When it comes to sessions, SQLAlchemy advises that you should manage their life cycle outside of the functions that contain the actual data manipulation, in order to keep them composable and ensure they can't interfere with each other's global session state when used together. Note that they also mention how transaction scope and session scope are distinct and, as far as I can tell, it's perfectly valid to commit or rollback manually within your routes, which is what I personally do.
You've already commented on tiangolo/full-stack-fastapi-template#290, so you're already aware of the problems I have with "automagic" Python thread-pooling for I/O. I hadn't heard of these issues before, but it's interesting to know that people have actually run into them. I haven't tried SQLA 1.4 yet, so I can't comment on their async support (their use of greenlets to enable support on sync drivers looks really interesting, though). It's good that non-async routes won't be needed as much with this, users should still be aware of other ways in which they may end up blocking the event loop, since the documentation doesn't really cover these right now, to my knowledge. See #3091.
Not everyone would agree with me that it's a big deal, but if you ask me, yeah, you should have lifetime bounds on everything that's initialized outside your routes. I've yet to migrate my existing code to use the scope dict instead of global placeholders, but the former definitely encourages you to make this work the right way. |
It really scares me that the tutorial docs still state to use the standard dependency driven approach, but from the various conversations around it seems like it falls apart under production load. Do we not yet have an official path for utilizing sqlalchemy with fastapi? |
@shawnwall: That would be SQLAlchemy 1.4's async mode, though by the looks of it, the FastAPI tutorial has yet to be updated to recommend this. It would probably need to go into some details about the kind of thread pool vs connection pool issues that have been raised in issues mentioned here (and issues that have mentioned this one), otherwise I'd forsee some people falling into that same trap even after the tutorial gets updated. |
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
The SQLAlchemy library comes with a
scoped_session
object, which effectively allows for thread-safe database sessions. Is there a reason the docs recommend using the dependency method > middleware method > just ascoped_session
global?The text was updated successfully, but these errors were encountered: