Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to make concurrent queries with transactions #116

Closed
LKay opened this issue Jun 27, 2019 · 7 comments
Closed

Unable to make concurrent queries with transactions #116

LKay opened this issue Jun 27, 2019 · 7 comments
Assignees

Comments

@LKay
Copy link

LKay commented Jun 27, 2019

Recently there was #81 resolved that enabled concurrent queries, but it only partially solves the problem. If queries are wrapped in transactions:

async with dtabase.transaction():
   ...

and executed in parallel the introduced lock doesn't work resulting in:

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
@tomchristie
Copy link
Member

Could you expand this into a complete example?

We got a pretty simple concurrency test case, so the first thing to progress this would be adding another test case demonstrating this failure case.

@otsuka
Copy link

otsuka commented Jan 13, 2020

Related to #134

@tomchristie
When using PostgreSQL, the test code below reproduces the error in my machine.
test_single_transaction() suceeds, but test_concurrent_transactions() always fails and the test process does not return.

I would really appreciate it if you could fix this problem.

database = databases.Database(
    url=postgresql_dsn,
    force_rollback=True,
    min_size=1,
    max_size=2,
)


class TestConcurrentTransaction:

    @pytest.fixture
    async def db(self):
        await database.connect()
        yield
        await database.disconnect()

    @pytest.mark.asyncio
    async def test_single_transaction(self, db):
        await service_method()
       
    @pytest.mark.asyncio
    async def test_concurrent_transactions(self, db):
        concurrent_num = 2
        coros = []
        for i in range(concurrent_num):
            coro = service_method()
            coros.append(coro)
        await asyncio.gather(*coros)


@database.transaction()
async def service_method():
    await repository_query_method()


async def repository_query_method():
    sel = "SELECT CURRENT_TIMESTAMP"
    row = await database.fetch_one(sel)
    print(row['current_timestamp'])

Error messages are as follows:

    @pytest.mark.asyncio
    @pytest.mark.use_db
    async def test_concurrent_transactions(self, db):
        concurrent_num = 2
        coros = []
        for i in range(concurrent_num):
            coro = service_method()
            coros.append(coro)
>       await asyncio.gather(*coros)

test_concurrent.py:37: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/Users/otsuka/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py:292: in __wakeup
    future.result()
/Users/otsuka/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py:223: in __step
    result = coro.send(None)
../../../../.venv/lib/python3.7/site-packages/databases/core.py:320: in wrapper
    async with self:
../../../../.venv/lib/python3.7/site-packages/databases/core.py:290: in __aenter__
    await self.start()
../../../../.venv/lib/python3.7/site-packages/databases/core.py:329: in start
    await self._transaction.start(is_root=is_root)
../../../../.venv/lib/python3.7/site-packages/databases/backends/postgres.py:217: in start
    await self._transaction.start()
../../../../.venv/lib/python3.7/site-packages/asyncpg/transaction.py:138: in start
    await self._connection.execute(query)
../../../../.venv/lib/python3.7/site-packages/asyncpg/connection.py:272: in execute
    return await self._protocol.query(query, timeout)
asyncpg/protocol/protocol.pyx:301: in query
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

asyncpg/protocol/protocol.pyx:664: InterfaceError

@trvrmcs
Copy link

trvrmcs commented Jul 22, 2020

I'm running into the same issue.

This is the simplest code I can come up with that demonstrates this error:

import asyncio
import databases


db = databases.Database("postgresql://example:password@localhost/example")


async def query():
    async with db.transaction():
        await db.fetch_one("SELECT 1")


async def test():
    await db.connect()
    await query()
    await asyncio.gather(query(), query())


if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

which causes the following exception

...
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

A few things to note:

  • If I remove the first call to query, the code passes
  • If I remove the call to gather, the code passes.
  • If I replace the line inside the transaction context manager with pass, the code still fails, but with a different error.

@encode encode deleted a comment from cyberbudy Oct 1, 2020
@encode encode deleted a comment from ToGoBananas Oct 1, 2020
@arauter
Copy link

arauter commented Jan 18, 2021

Any progress on this open ticket? I am facing the same issue.

@trvrmcs
Copy link

trvrmcs commented Jan 21, 2021

I've outlined the underlying cause of this problem in #230. Fixing it requires a fundamental change to how the library stores concurrent connections - I think the existing mechanism using ContextVar is fundamentally broken. I've had to stop using databases because of this. I'd be happy to collaborate on a solution, but I don't know enough about task-specific storage to know what the correct fix should be.

@vmarkovtsev
Copy link
Contributor

vmarkovtsev commented Jan 21, 2021

Fellows, please check my workarounds: #176 (comment)

  • I am the current maintainer of the project.
  • I am using those workarounds in my production.

These two points guarantee that the workarounds will not break 😄 If somebody makes a PR from them, I'll happily merge it.

@aminalaee
Copy link
Member

I think #328 fixed this and I'll close this.

@aminalaee aminalaee self-assigned this Aug 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants