Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Garbage collector cleans up pool during creation in ASGI server #878

Closed
MatthewScholefield opened this issue Jan 1, 2021 · 13 comments
Closed
Labels
need investigation Need to look into described issue.

Comments

@MatthewScholefield
Copy link

MatthewScholefield commented Jan 1, 2021

I've been tracking down an exception in my FastAPI backend that occurred roughly once every 20 startups. This led me to a huge rabbit hole of debugging that ended up uncovering the following error when using aioredis with an ASGI server:

example.py:

pool = None

async def app(scope, receive, send):
    global pool
    if scope['type'] == 'lifespan':
        message = await receive()  # On startup
        pool = await aioredis.create_redis_pool('redis://localhost:6379')
        await send({"type": "lifespan.startup.complete"})
        message = await receive()  # Wait until shutdown
    else:
        await pool.ping()  # (Use pool during requests)

When running this with uvicorn example:app it seems like everything works (the app starts up correctly), but if we force garbage collection on a specific line within the event loop, we consistently encounter the following error:

Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<RedisConnection._read_data() running at .../site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb4af031f70>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at .../site-packages/aioredis/connection.py:168]>
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<LifespanOn.main() running at .../site-packages/uvicorn/lifespan/on.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb4aefbf070>()]>>

While a bit hacky, we can force this garbage collection in the event loop as follows:

  • Edit /usr/lib/python3.*/asyncio/base_events.py and within def _run_once, near the bottom immediately within the for i in range(ntodo):, add import gc; gc.collect().
  • Force Uvicorn to use the asyncio event loop so that it uses this modified code by running with: uvicorn example:app --loop asyncio

After doing this, we see the above error every single startup.

Notes

  • Note, since garbage collection can run at any time, this bug will appear randomly in real world situations (which is what initially started this investigation)
  • This error occurs even without the gc.collect() modification if we write await create_redis_pool(...) instead of loop = await create_redis_pool(...). I think this might be expected though because we are awaiting an rvalue.
  • Strangely, without running in uvicorn (ie. passing dummy functions to receive and send), the error doesn't appear
  • Additionally, it doesn't happen when using hypercorn

I'm hesitant to say it's an error with hypercorn, however, because hypercorn isn't doing anything but calling the handlers. Perhaps hypercorn holds on to some extra references which is why the error doesn't happen there?

Does anyone have any insight on this (specifically, RedisConnection._read_data() running at .../site-packages/aioredis/connection.py:186...)? According to this SO post, there can be some weirdness on awaiting futures without hard references. If it seems to be an issue with Uvicorn, I can close this and create it there.

@seandstewart
Copy link
Collaborator

@MatthewScholefield this is something I've run into as well, and was resolved by using explicit connections from the pool during the app lifetime, then explicitly closing the pool on app shutdown.

@waketzheng
Copy link
Contributor

@MatthewScholefield We also got an exception in Sanic backend:

source_traceback: Object created at (most recent call last):
  File "ws/app.py", line 220, in main
    app.run(host="0.0.0.0", port=port, debug=True)
  File "/home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/sanic/app.py", line 1170, in run
    serve(**server_settings)
  File "/home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/sanic/server.py", line 856, in se
rve
    loop.run_forever()
  File "ws/app.py", line 152, in server
    conn = await create_redis(REDIS_URL)
  File "/home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/aioredis/commands/__init__.py", l
ine 168, in create_redis
    conn = await create_connection(address, db=db,
  File "/home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/aioredis/connection.py", line 128
, in create_connection
    conn = cls(reader, writer, encoding=encoding,
  File "/home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/aioredis/connection.py", line 162
, in __init__
    self._reader_task = asyncio.ensure_future(self._read_data())
task: <Task pending name='Task-4667' coro=<RedisConnection._read_data() running at /home/ubuntu/.local/share/virtualenvs/websocket
duli--KNzsfn6/lib/python3.8/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object a
t 0x7f992972ee80>()] created at /usr/local/lib/python3.8/asyncio/streams.py:515> cb=[RedisConnection.__init__.<locals>.<lambda>()
at /home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/aioredis/connection.py:168] created at
 /home/ubuntu/.local/share/virtualenvs/websocketduli--KNzsfn6/lib/python3.8/site-packages/aioredis/connection.py:162>
Task was destroyed but it is pending!

py code:

@app.websocket("/feed-home/<sn>")
async def server(request, ws, sn: str):
    conn = await create_redis(REDIS_URL)
    listening_channels = [shop(sn), devices(sn)]
    tasks = None
    try:
        while True:
            coros = [send_msg(ws, sn, conn, channel) for channel in listening_channels]
            loop = asyncio.events.get_running_loop()
            tasks = {asyncio.ensure_future(f, loop=loop) for f in coros}
            done, pending = await asyncio._wait(
                tasks, None, return_when=asyncio.FIRST_COMPLETED, loop=loop
            )
            for done_task in done:
                if done_task.result() == "close":
                    await ws.close()
                    raise BreakLoopException("Skip while True loop.")
            for outdate_task in pending:
                outdate_task.cancel()
    except BreakLoopException as e:
        logger.error(e)
    finally:
        conn.close()
        await conn.wait_closed()
        if tasks:
            for task in tasks:
                task.cancel()

@seandstewart Is there any example code about how to resolved?

@nikita-davydov
Copy link

I'm getting something similar to this error using FastAPI with gunicorn and uvicorn workers which use uvloop
Task was destroyed but it is pending! task: <Task pending name='Task-57663' coro=<RedisConnection._read_data() done, defined at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x3ea2c3b858b0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:168]>

It raises randomly

@Andrew-Chen-Wang
Copy link
Collaborator

@waketzheng Try creating a pool instead and acquire connections from the pool. You can use the async with context to handle that (or an AsyncExitStack). Also I thought a single connection could only listen to one pattern at a time...? Ref: coros = [send_msg(ws, sn, conn, channel) for channel in listening_channels]

@ndavydovdev I think that could just be because you're not explicitly closing the pool on shutdown.

@nikita-davydov
Copy link

@Andrew-Chen-Wang My app is working while these errors are raising and my pool is a singleton object which will be closed only after my GAE instance is down, so it can be 30-50 times per day, but now I'm getting 1500 errors like that per day

@Andrew-Chen-Wang
Copy link
Collaborator

Andrew-Chen-Wang commented Jan 15, 2021

@ndavydovdev Are you using a framework? When I mean shutdown, I guess I meant 1 request/response cycle. Even though it's a singleton object, I know some Python frameworks will use workers like gunicorn to close a thread at the end of a request. That means your singleton object is being destroyed but your connection isn't. Try explicitly closing your connection before the end of each request and see if it's raising any more errors.

Singleton objects are useful when it comes to having to use a pool multiple times or even concurrently, but something like Django is destroying all Python objects at the end of a request. (You can take a look at Django's cache handler if you're interested; they have a signal that at the end of a request, they close all connections).

@nikita-davydov
Copy link

@Andrew-Chen-Wang Thanks for your advice! I'm calling conn.close() and await conn.wait_closed() after every operation (get, set, etc)
I think I will try to create and close connection for every request because I recognized that I have e.g. 5-6 redis calls per request and every request it closes the existing connection in pool and recreates new ones so maybe creating one connection per request can reduce my overall latency

@leoswaldo
Copy link

@ndavydovdev hi, did you actually got it to work?, cause I'm closing all my connections doing the conn.close() and await conn.wait_closed() and the client connections do close (I can see from redis server they are closed), however this error is thrown still as if the actual task (not the connection) still were alive, this happens very randomly every few hours

@nikita-davydov
Copy link

nikita-davydov commented Feb 10, 2021

@leoswaldo Hi, no, I didn't, but I see that my asyncio tasks which were scheduled by create_task func also have the same error messages in logs - Task was destroyed but it is pending, now I'm trying to exclude create_task because I don't know exactly that tasks are finished or not
Maybe this information will help to recognize how to prevent this errors
So I bet that the problem is in using asgi server

@nikita-davydov
Copy link

nikita-davydov commented Feb 10, 2021

@leoswaldo also I'm sure that my FastAPI under gunicorn server with uvicorn workers doesn't restart the same time as errors are being raised in logs

@MatthewScholefield
Copy link
Author

MatthewScholefield commented Mar 2, 2021

I've just realized this is actually a bug in uvicorn and have created a PR as shown above to fix it.
Funny enough, I also realized this same error was caused by another line near the top of my call stack that I wrote as follows:

def on_startup():
    asyncio.create_task(some_function())

Since the task wasn't assigned to anything, the entire partially executed coroutine could be garbage collected. So, TL;DR:

  • If you are running using uvicorn, this PR might solve the problem
  • Otherwise, you or a library you use to call your code has probably called asyncio.create_task(foo()) without assigning the result to some hard reference.

@seandstewart
Copy link
Collaborator

@MatthewScholefield -

Would you be willing to test with the latest master? #891 was just merged and is a complete overhaul of the library. I'd be curious to see if the new implementation helps with this issue...

@seandstewart seandstewart added the need investigation Need to look into described issue. label Mar 19, 2021
@MatthewScholefield
Copy link
Author

Actually, the original issue actually was determined to be a problem with other libraries so I'll close this. If anyone else here finds that their issue is not related to external code, feel free to create a new issue.

Note, that for anyone still experiencing this problem and trying to track the root cause, the method described in this PR is a reliable way to trigger it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
need investigation Need to look into described issue.
Projects
None yet
Development

No branches or pull requests

6 participants