Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use fastapi Depends for database sessions #136

Closed
costrouc opened this issue Oct 31, 2023 · 13 comments
Closed

Don't use fastapi Depends for database sessions #136

costrouc opened this issue Oct 31, 2023 · 13 comments

Comments

@costrouc
Copy link

costrouc commented Oct 31, 2023

Bug description

I just saw https://github.com/Quansight/ragna/blob/main/ragna/_api/core.py#L104. This is the exact issue that we had in conda-store that Nikita found. Depends does not reliably terminate the session when the request ends. FastAPI doesn't guarantee when the depends statement will be terminated. In conda-store the fix was to create the session explicitly with a context manager or middleware. Deep dive in this PR conda-incubator/conda-store#622

How to reproduce the bug?

Open a bunch of api requests concurently and database connection pool will become exhausted (I don't have a test for this) just can report this issue happended in conda-store.

Versions and dependencies used

No response

Anything else?

No response

@costrouc costrouc added the type: bug 🐛 Something isn't working label Oct 31, 2023
@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

I did a deep dive. The dependencies handling happens here and the actual call here.

async def solve_generator(
    *, call: Callable[..., Any], stack: AsyncExitStack, sub_values: Dict[str, Any]
) -> Any:
    if is_gen_callable(call):
        cm = contextmanager_in_threadpool(contextmanager(call)(**sub_values))
    elif is_async_gen_callable(call):
        cm = asynccontextmanager(call)(**sub_values)
    return await stack.enter_async_context(cm)

So FastAPI does wrap the dependencies into a context manager and thus should terminate the session correctly.

@nkaretnikov
Copy link

@pmeier I haven't looked at what ragna does. But here's context from conda-store:

@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

Thanks a ton for the links @nkaretnikov! I think the relevant write-up is in tiangolo/fastapi#3205 (comment). TL;DR the deadlock happens because certain SQLAlchemy operations like Session().execute are blocking if the connection pool is exhausted. However, the old ones can never be terminated, because this need yet another thread from the same pool. Hence, dead lock.

As reported their, using a context manager inside the endpoint will solve the issue and is the easiest to implement. I'll do that in the short term.

However, since are past sqlalchemy==1.4.0

"sqlalchemy>=2",

we should probably want to use the async session anyway. And with that, we should be able to circumvent the issue as well.

@pmeier pmeier self-assigned this Oct 31, 2023
@nkaretnikov
Copy link

@pmeier Note: I haven't tried using async with our application, only with some testing code, so I'm not sure whether it'll help. But it might also require you switching to an async protocol for the DB and wrapping all your DB calls into async/await, which is why I didn't want to do it. E.g., we also use celery in some places and I didn't want that running in the async loop.

@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

But it might also require you switching to an async protocol for the DB and wrapping all your DB calls into async/await

All our endpoints are async anyway so making the DB calls async should not be an issue:

ragna/ragna/_api/core.py

Lines 102 to 112 in d4ee0ee

@app.get("/document")
async def get_document_upload_info(
session: SessionDependency,
user: UserDependency,
name: str,
) -> schemas.DocumentUploadInfo:
document = schemas.Document(name=name)
url, data, metadata = await config.core.document.get_upload_info(
config=config, user=user, id=document.id, name=document.name
)
database.add_document(session, user=user, document=document, metadata=metadata)

It should just be await database.add_document(...) plus the changes in the actual function as well.

we also use celery in some places and I didn't want that running in the async loop.

In Ragna, the DB is completely detached from the task queue (although that is also running async) so this is not an issue.

@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

According to tiangolo/fastapi#3205 (comment) this was solved in tiangolo/fastapi#5122. The commit was included starting in fastapi==0.82.0. I see that fastapi is unpinned in conda-store-server. @nkaretnikov Do you recall what version you tested this against? fastapi==0.82.0 is now roughly one year old, so maybe you are just stuck on an old version?

Removing the bug label for now, since after the PR was merged, there were no more comments on the same thread indicating that the fix didn't work. The PR itself has a repro, so I'm going to try that to see if there is actually a problem here.

@pmeier pmeier added needs: investigation 🔎 and removed type: bug 🐛 Something isn't working labels Oct 31, 2023
@nkaretnikov
Copy link

@pmeier I cannot provide the version information because it's not pinned and I recreated my env recently. So I won't be able to tell whether I'm using the same version I used while fixing this issue in conda-store.

@pmeier
Copy link
Member

pmeier commented Oct 31, 2023

I've ran the following script against main / sqlite and the API / DB handles this just fine:

from threading import Thread

import httpx


def main(n=1_000):
    client = get_client()

    threads = [
        Thread(target=get_document, args=(client, f"document_{idx}.txt"))
        for idx in range(n)
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()


def get_document(client, name):
    return client.get("/document", params={"name": name})


def get_client():
    client = httpx.Client(base_url="http://127.0.0.1:31476")
    client.get("/").raise_for_status()

    username = password = "user"
    token = (
        (
            client.post(
                "/token",
                data={"username": username, "password": password},
            )
        )
        .raise_for_status()
        .json()
    )
    client.headers["Authorization"] = f"Bearer {token}"

    return client


if __name__ == "__main__":
    main()

Meaning, roughly 1000 concurrent requests seems to be fine. That being said, @costrouc noted that sqlite might be special cased in SQLAlchemy (conda-incubator/conda-store#622 (review)). Thus, I need to rerun this against PostgreSQL or the like.

@nkaretnikov
Copy link

My 2c:

  • We saw this problem with PostgreSQL
  • I remember seeing SQLite being special in the context of sessions in SQLAlchemy, but I cannot find the docs right now for some reason
  • SQLite has other issues, which can lead to locking: https://www2.sqlite.org/cvstrac/wiki?p=DatabaseIsLocked
  • In your test code, you use threading while I used async - not sure if it matters WRT how fast requests can be sent out
  • In our application, it reproduced on endpoints that used complex queries: fetching, filtering, sorting, etc. Which I think made it easier to repro.

@pmeier pmeier changed the title [BUG] - Don't use fastapi Depends for database sessions Don't use fastapi Depends for database sessions Nov 1, 2023
@v3ss0n
Copy link

v3ss0n commented Nov 2, 2023

we faced that along with serveral problems and had moved away to much better maintained Litestar. give a try.
https://Litestar.dev

@pmeier
Copy link
Member

pmeier commented Nov 2, 2023

@v3ss0n Apart from this (potential!) wrinkle, FastAPI has been working well for us. I've never used Litestar so I can't really tell if it is "better" (in whatever sense) for our use case. If you feel strongly about this, please open a dedicated issue. However, until we have better test coverage, we won't swap a major dependency.

@pmeier
Copy link
Member

pmeier commented Mar 18, 2024

Closing this as we no longer doing it anyway. Since a few of our endpoints take a relatively long time to process, i.e. answer and prepare, we don't want to keep the database connection open for the whole time. Meaning, we use context manager twice: first to retrieve something from the database, and later again to store the results of the computation:

with get_session() as session:
chat = database.get_chat(session, user=user, id=id)
chat.messages.append(
schemas.Message(content=prompt, role=ragna.core.MessageRole.USER)
)
core_chat = schema_to_core_chat(session, user=user, chat=chat)

with get_session() as session:
chat.messages.append(answer)
database.update_chat(session, user=user, chat=chat)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants