Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incorrect concurrent usage of connection and transaction #546

Merged
merged 20 commits into from Jul 25, 2023

Conversation

zevisert
Copy link
Contributor

@zevisert zevisert commented Apr 7, 2023

This commit should fix an issue with shared state with transactions.

The main issue is very well described by @circlingthesun here: #123 (comment) - we also ran into this most reliably using locust to load test as well, but did find this issue a few times in normal usage.

The big problem is that using the @db.transaction decorator creates a new Databases.Transaction instance that, when called creates a context manager for a transaction. The problem is that in concurrent frameworks, such as ASGI servers like starlette / fastapi, it's possible to have two concurrent tasks running. Since the Transaction instance is shared between tasks, it's self variable is comparable to shared global state, concurrent code can't trust that no race condition has occurred without some helper mechanism like a ContextVar.

Here's a mermaid diagram trying to show this:

sequenceDiagram
    actor User1
    actor User2
    participant API
    participant TXN
    API->>TXN: (startup) @db.transaction creates new shared Transaction(...) for route
    User1->>API: (1): Request to API route creates new asyncio.Task(...)
    API->>TXN: (1): Transaction.__call__ assignes self._connection
    User2->>API: (2): Request to same API route creates new asyncio.Task(...)
    API->>TXN: (2): PROBLEM -- Transaction.__call__ obtains new Databases.Connection for this asyncio.Task,<br/> like above, but overwrites self._connection
    TXN->>API: (1): PROBLEM -- Transaction.__aexit__ uses self._connection of task (2)<br/>since it was not retrieved from a ContextVar
    API->>User1: (1): Responding here closes the transaction's _connection
    TXN->>API: (2): FAULT -- Transaction.__aexit__ expects to also close self._connection, <br/>but since it also tries to retrieve the connection from self._connection instead of the ContextVar,<br/>it retireves (1)'s already closed connection
    API->>User2: (2): FAULT -- Propigated exception results in 500 response code

All I've done with this PR is move the shared global state of self._connection and self._transaction into local or contextvars instead of instance variables.

Related issues are: #123 #134
Possibly related: #340
Related meta issue: #456

databases/core.py Outdated Show resolved Hide resolved
databases/core.py Outdated Show resolved Hide resolved
@zanieb
Copy link
Contributor

zanieb commented Apr 8, 2023

Welcome and thanks for contributing! For some context, I'm a member of encode but not a maintainer of this project. Reading this out of curiosity and just have a couple notes about proper usage of the context var. Someone else with more context for the project will have to do a review of the changes too.

@zevisert
Copy link
Contributor Author

Thanks for the review. I'm eager to move this forward. We use databases in one of our products, and when load testing this certainly fixes an issue with concurrency. Even your two comments were pretty insightful, so if you have another encode member who is interested in taking a look that would be great.

I'll also try the module level context var solution you helped me discover in our codebase to see if that can still fix the issue. As is stands Databases was (and continues) leaking memory from it's usage of non-module level contextvars, but if I can fix that here too then that's great!

@zevisert
Copy link
Contributor Author

Also related #155

@zanieb
Copy link
Contributor

zanieb commented Apr 10, 2023

Do you have a minimal executable example I could use to explore this?

@zevisert
Copy link
Contributor Author

I think #155 should be close enough, but I'll try and get a dedicated MCVE up here today

@zevisert
Copy link
Contributor Author

zevisert commented Apr 11, 2023

@madkinsz Here's a minimal example as a test - turns out I could get it pretty small. This fails on master, but passes perfectly fine here:

# file: tests/test_concurrent_tasks.py
import pytest
import asyncio
from tests.test_databases import async_adapter, DATABASE_URLS
from databases import Database


@pytest.mark.parametrize("database_url", DATABASE_URLS)
@async_adapter
async def test_concurrent_tasks(database_url: str):
    """
    Test concurrent tasks.
    """

    async with Database(database_url) as database:
        # This could be an eg, fastapi / starlette endpoint
        @database.transaction()
        async def read_items() -> dict:
            return dict(await database.fetch_one(query="SELECT 1 AS value"))

        # Run 10 concurrent **tasks**
        tasks = (asyncio.create_task(read_items()) for _ in range(10))
        responses = await asyncio.gather(*tasks, return_exceptions=True)

        # Check the responses
        assert all(isinstance(response, dict) for response in responses)
        assert all(response["value"] == 1 for response in responses)
I'm using docker compose and pytest to run this
# file: test.docker-compose.yaml
version: "3"
services:
  postgres:
    image: postgres:14
    environment:
      POSTGRES_USER: username
      POSTGRES_PASSWORD: password
      POSTGRES_DB: testsuite
    ports:
      - 5432:5432

  mysql:
    image: mysql:5.7
    environment:
      MYSQL_USER: username
      MYSQL_PASSWORD: password
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: testsuite
    ports:
      - 3306:3306
$  docker compose --file test.docker-compose.yaml up --detach 
[+] Running 2/2
 ⠿ Container databases-mysql-1     Started
 ⠿ Container databases-postgres-1  Started   

$ git log -1 --oneline
b6eba5f (HEAD -> master, upstream/master, upstream/HEAD, github/master) Bump wheel from 0.37.1 to 0.38.1 (#524)

$ DATABASE_URLS=(
>     'sqlite:///testsuite'
>     'sqlite+aiosqlite:///testsuite'
>     'mysql://username:password@localhost:3306/testsuite'
>     'mysql+aiomysql://username:password@localhost:3306/testsuite'
>     'mysql+asyncmy://username:password@localhost:3306/testsuite'
>     'postgresql://username:password@localhost:5432/testsuite'
>     'postgresql+aiopg://username:password@127.0.0.1:5432/testsuite'
>     'postgresql+asyncpg://username:password@localhost:5432/testsuite'
> )
$ export TEST_DATABASE_URLS=$(IFS=, ; echo "${DATABASE_URLS[*]}")

$ pytest -k test_concurrent_tasks --verbose --exitfirst  # --exitfirst for brevity, they all fail or hang
==================================== test session starts =====================================
platform linux -- Python 3.10.8, pytest-7.1.2, pluggy-1.0.0 -- /opt/tooling/pyenv/versions/encode-databases/bin/python3.10
cachedir: .pytest_cache
rootdir: /code/oss/github/encode/databases
plugins: cov-3.0.0, anyio-3.6.2
collected 360 items / 352 deselected / 8 selected                                            

tests/test_concurrent_fastapi.py::test_concurrent_tasks[sqlite:///testsuite] FAILED    [ 12%]

========================================== FAILURES ==========================================
_________________________ test_concurrent_tasks[sqlite:///testsuite] _________________________

database_url = 'sqlite:///testsuite'

    @pytest.mark.parametrize("database_url", DATABASE_URLS)
    @async_adapter
    async def test_concurrent_tasks(database_url: str):
        """
        Test concurrent tasks.
        """
    
        async with Database(database_url) as database:
            # This could be an eg, fastapi / starlette endpoint
            @database.transaction()
            async def read_items() -> dict:
                return dict(await database.fetch_one(query="SELECT 1 AS value"))
    
            # Run 10 concurrent **tasks**
            tasks = (asyncio.create_task(read_items()) for _ in range(10))
            responses = await asyncio.gather(*tasks, return_exceptions=True)
    
            # Check the responses
E >         assert all(isinstance(response, dict) for response in responses)
E           assert False
E            +  where False = all(<generator object test_concurrent_tasks.<locals>.<genexpr> at 0x7f1b26e86420>)

tests/test_concurrent_fastapi.py:25: AssertionError
================================== short test summary info ===================================
FAILED tests/test_concurrent_fastapi.py::test_concurrent_tasks[sqlite:///testsuite] - asser...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
======================== 1 failed, 352 deselected, 1 warning in 0.28s ========================
^CException ignored in: <module 'threading' from '/opt/tooling/pyenv/versions/3.10.8/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/opt/tooling/pyenv/versions/3.10.8/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

$ git switch -  
Switched to branch 'fix-transaction-contextvar'
Your branch is up to date with 'github/fix-transaction-contextvar'.

$ git log -1 --oneline
3c0790d (HEAD -> fix-transaction-contextvar, github/fix-transaction-contextvar) fix: contextvar.get takes no keyword arguments

$ pytest -k test_concurrent_tasks --verbose --exitfirst
==================================== test session starts =====================================
platform linux -- Python 3.10.8, pytest-7.1.2, pluggy-1.0.0 -- /opt/tooling/pyenv/versions/encode-databases/bin/python3.10
cachedir: .pytest_cache
rootdir: /code/oss/github/encode/databases
plugins: cov-3.0.0, anyio-3.6.2
collected 360 items / 352 deselected / 8 selected                                            

tests/test_concurrent_fastapi.py::test_concurrent_tasks[sqlite:///testsuite] PASSED    [ 12%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[sqlite+aiosqlite:///testsuite] PASSED [ 25%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[mysql://username:password@localhost:3306/testsuite] PASSED [ 37%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[mysql+aiomysql://username:password@localhost:3306/testsuite] PASSED [ 50%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[mysql+asyncmy://username:password@localhost:3306/testsuite] PASSED [ 62%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[postgresql://username:password@localhost:5432/testsuite] PASSED [ 75%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[postgresql+aiopg://username:password@127.0.0.1:5432/testsuite] PASSED [ 87%]
tests/test_concurrent_fastapi.py::test_concurrent_tasks[postgresql+asyncpg://username:password@localhost:5432/testsuite] PASSED [100%]

======================= 8 passed, 352 deselected, 21 warnings in 0.68s =======================

@zevisert
Copy link
Contributor Author

zevisert commented Apr 11, 2023

Rebased to sign commits, all new and old tests passing, switched to module level contextvars to allow for the garbage collector to clean up unreferenced objects.

Edit: And now all linting checks pass too

@zanieb
Copy link
Contributor

zanieb commented Apr 16, 2023

Hey! I took some time to poke around at this today. It seems a little wild to use a ContextVar to store a dictionary that includes the asyncio.Task in it (the whole point of them is that they handle task locality for you). I'm not sure what we gain by using a context variable in this case; we can just use an instance dictionary and remove a lot of complexity.

For example, I've implemented this at master...madkinsz:example/instance-safe and it passes the MRE you provided (I only checked SQLite because I didn't want to deal with docker-compose)

Curious for your thoughts

@zevisert
Copy link
Contributor Author

we can just use an instance dictionary and remove a lot of complexity

I like that take too. Looking back I was a little too committed to getting the ContextVars to work properly. With the constraint of only using context variables in the module scope, and the possibility of multiple database instances with connections to different db's, I needed to introduce the current task back into the context variable to keep things separated correctly. I think that what you're showing with it being stored on the instance and using asyncio.current_task instead of a ContextVar altogether should work just fine!

I'll give it a go with the other databases right now, and in the project were we initially ran into this problem.

@zevisert
Copy link
Contributor Author

Yeah, this seems great. I've adopted/merged your changes into this MR to accompany the new tests I'm contributing here. I kept a few of the assertions I had added when resolving conflicts - I saw a few of them in the existing code. Let me know what encode's stance is on assert as well as del some_dict[some_key] vs some_dict.pop(some_key) as it seems like we have different styles there too.

Thanks for your help on this, I'm excited for sentry to finally stop notifying us about the errors from this issue!

databases/core.py Outdated Show resolved Hide resolved
databases/core.py Outdated Show resolved Hide resolved
@zanieb
Copy link
Contributor

zanieb commented Apr 18, 2023

Glad you think it makes sense too!

I think my last concern is the event-loop compatibility bit (e.g. trio).

@tomchristie Do you think you'd be willing to give this a look soon or point me to a better reviewer?

@zevisert
Copy link
Contributor Author

I don't think Databases supports other event loop implementations like trio, curio, etc. I see an old PR #357 that was working on adding anyio support, but it's in draft still.

@zevisert zevisert changed the title fix: incorrect concurrent usage of connection and transaction draft: fix: incorrect concurrent usage of connection and transaction May 26, 2023
@zevisert zevisert marked this pull request as draft May 26, 2023 01:59
@zevisert
Copy link
Contributor Author

Okay thanks to @reclosedev's issue #134, I've circled back around to thinking that users probably don't want implicit connection inheritance across tasks, BUT do want transaction inheritance if they explicitly reuse the same connection among descendant tasks.

Their example boils down to this:

import os
import asyncio
import databases

database = databases.Database(os.environ["DB_URL"])

async def worker():
    async with database.transaction():
        await database.execute("SELECT 1 + 1")

async def main():
    await database.connect()
    await database.execute("SELECT 1 + 1")
    await asyncio.gather(*[worker() for _ in range(5)])

asyncio.run(main())

That is:

  1. Instantiate a Database and .connect to set up a connection pool with the database backend. (.connect is and has always been lazy, actual connections are established by database.connection())
  2. Do something that interacts with the database, causing a connection to be created for the parent task.
  3. Spawn a bunch of workers to do independent work, notably not from within a transaction.
  • This sequence of actions is very similar to a typical FastAPI user (or other web frameworks). They create a database instance that maybe checks for a given migration state on startup, then serves requests in their own tasks.

In the current version of this PR (and on master) this example is still broken because we are using ContextVars such that the current connection is inherited by all descendant tasks. The descendant tasks cannot do parallel work within transactions, because they all share the same connection - and a single backend connection can only support nested transactions, not concurrent transactions.


This all makes me think that the best solution is for each database instance to have an attribute that tracks connections in used per-task via something like I had been playing with in 8370299:

class Database:
    _active_connections: WeakKeyDictionary[asyncio.Task, Connection]

This change would make connections task-local with no inheritance. Each new task acquires a new connection from the backend's connection pool unless a connection is explicitly provided to the task. The implicit connection inheritance to descendant tasks is what seems to be the cause of most user frustration with concurrent code. #230 explicitly calls this out.

TransactionBackends, on the other hand, should still be tracked with ContextVariables so that transactions nest given the same connection is used - that's the whole point of Connection._transaction_stack anyway. I roughly suggested this pattern earlier when responding to @ToGoBananas example, as at the time we had no context variables at all so child tasks did not inherit connections, and he was looking for support for the case where a child task needed to influence the state of a transaction in the parent. My suggestion was to explicitly pass the connection with the open transaction to the descendant task. Passing the connection brings the state of any active transactions on that connection along with it.

@zevisert
Copy link
Contributor Author

Oh and another thing I found just now with the current implementation - context variables undergo a shallow clone when context is copied for new tasks. That means that we had been undoing any isolation that context variables had been providing by mutating the WeakKeyDictionary if it had been created before any descendant tasks started. Simple fix though, just need to treat those dictionaries as immutable.

Connections are once again stored as state on the Database instance,
keyed by the current asyncio.Task. Each task acquires it's own
connection, and a WeakKeyDictionary allows the connection to be
discarded if the owning task is garbage collected. TransactionBackends
are still stored as contextvars, and a connection must be explicitly
provided to descendant tasks if active transaction state is to be
inherited.
@zevisert zevisert force-pushed the fix-transaction-contextvar branch from bc28059 to b94f097 Compare May 26, 2023 22:46
@zevisert zevisert marked this pull request as ready for review May 26, 2023 22:47
@zevisert zevisert changed the title draft: fix: incorrect concurrent usage of connection and transaction fix: incorrect concurrent usage of connection and transaction Jun 1, 2023
@zevisert
Copy link
Contributor Author

zevisert commented Jul 4, 2023

Anything more I can do for anyone here to get this reviewed and released?

Copy link
Contributor

@zanieb zanieb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has landed in a good place. However, I am not a frequent maintainer of this repository and we'll need someone else to look it over before it can be merged.

@zanieb zanieb requested review from tomchristie and aminalaee and removed request for circlingthesun July 12, 2023 00:56
@zanieb
Copy link
Contributor

zanieb commented Jul 25, 2023

Thanks for your patience!

@zanieb zanieb merged commit 25fa295 into encode:master Jul 25, 2023
4 checks passed
@zevisert
Copy link
Contributor Author

zevisert commented Jul 31, 2023

Thanks for the merge! Could we get @Kludex or @aminalaee to cut a new release so that we can actually have this bug corrected in our codebases? Is there a changelog or blog article I can help write for this? Is there a release cadence or policy that my team can plan from?

@zanieb
Copy link
Contributor

zanieb commented Jul 31, 2023

@zevisert as far as I can tell, I can make a release. I do not think anyone else is available to maintain this project.

I'd definitely appreciate if you opened a pull request for a changelog entry at https://github.com/encode/databases/blob/master/CHANGELOG.md

There is no planned release cadence for this project. I'd like to release this change and some other fixes but there is little active development here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants