Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors with multiple tasks and transactions in postgresql and sqlite #134

Closed
reclosedev opened this issue Aug 9, 2019 · 4 comments
Closed

Comments

@reclosedev
Copy link

Minimal reproduce script with comments

import asyncio

import databases
import sqlalchemy as sa

# import logging
# logging.basicConfig(level=logging.DEBUG)


DB_URL = "sqlite:////tmp/test.db"

metadata = sa.MetaData()
table = sa.Table(
    "t1", metadata,
    sa.Column("id", sa.Integer()),
    sa.Column("data", sa.String()),
)
database = databases.Database(DB_URL)


async def init_db():
    engine = sa.create_engine(DB_URL)
    metadata.create_all(engine)
    await database.connect()


async def work(start: int, n=5, data="a"):
    async with database.transaction():
        query = table.insert()
        for i in range(start, n):
            await database.execute(query, {"id": i, "data": data})
    print(f"Ok {start}")


async def main_async():
    await init_db()
    futures = []
    some_query = table.select()
    # Error is raised if there is query executed before creating futures
    # Doesn't matter if transaction is explicit or not
    async with database.transaction():
        # !!! If I remove this query (from parent task), it works fine
        await database.fetch_all(some_query)
    step = 5
    for i in range(0, 4 * step, step):
        fut = asyncio.ensure_future(work(i, i + step))
        futures.append(fut)
    await asyncio.wait(futures)
    print("Done")


loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())

Error with sqlite database:

databases/core.py", line 305, in commit
assert self._connection._transaction_stack[-1] is self

Error with postgresql database:

  File "asyncpg/protocol/protocol.pyx", line 301, in query
  File "asyncpg/protocol/protocol.pyx", line 659, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
@reclosedev
Copy link
Author

Looks like the root cause of this issue is how contexvars works. I've tried 3.6 aiocontextvars and 3.7 standart's library verision.

If context var is set in outer scope, then all inner tasks have the same value, so new connection is not created in new tasks.

If you change code in example to

    new_context = contextvars.Context()
    for i in range(0, 4 * step, step):
        fut = new_context.run(asyncio.ensure_future, work(i, i + 5))
        futures.append(fut)
    await asyncio.wait(futures)

then it works as expected. But I don't think that it's a good solution.

@jarppe
Copy link

jarppe commented Oct 27, 2019

Perhaps my problem is related, but I don't get any errors, in my case workers just get stuck.

import asyncio
from databases import Database


def database():
    return Database("postgresql://",
                    host="db",
                    port=5432,
                    min_size=1,
                    max_size=3,
                    user="tiger",
                    password="hunter2
                    database="test")


async def insert_something(db: Database):
    async with db.connection() as conn:
        async with db.transaction():
            await db.execute("insert into person (name) values (:name)", {"name": "testing..."})


async def query_something(db: Database, n):
    async with db.connection() as conn:
        async with db.transaction():
            row = await db.fetch_one("select :foo as foo", {"foo": "bar"})
            print(f"{n} done")


async def run():
    async with database() as db:

        # If this is commented out then the app runs successfully:
        await insert_something(db)

        tasks = []
        for n in range(10):
            tasks.append(asyncio.create_task(query_something(db, n)))

        print("Waiting...")
        await asyncio.gather(*tasks)
        print("Done")


if __name__ == "__main__":
    asyncio.run(run())

When run I get 4 or 5 worker printing done, and then the app is stuck. If I comment out the insert_someting then all the workers run and application terminates.

The size of the pool does not matter. Also I tried using db.transaction() but that does not seem to matter either.

@ojii
Copy link

ojii commented Apr 16, 2020

I have the same issue. asyncio.gather(...) two Database.iterate calls causes asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress.

@zevisert
Copy link
Contributor

I'm quite certain this is the same underlying issue as #125, #424, #452, and the discussion in #456. It all comes back to how ContextVars have been used in this library, and @reclosedev was totally on the right track all the way back in 2019. I believe I have fixed this in #546 - the gist is that only one parent transaction can run concurrently per connection (but nested transactions are okay). In #546, I'm proposing that databases acquires a new connection from a connection pool for each asyncio.Task. If concurrent work needs to happen on the same connection or within the same transaction, users should pass the connection instance to those child tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants