Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Task was destroyed but is is pending #154

Closed
argaen opened this issue Sep 30, 2016 · 14 comments
Closed

Task was destroyed but is is pending #154

argaen opened this issue Sep 30, 2016 · 14 comments

Comments

@argaen
Copy link
Contributor

argaen commented Sep 30, 2016

Hi, I'm receiving multiple messages stating the following:

Task was destroyed but it is pending!
task: <Task pending coro=<RedisPool._do_close() running at /Users/manuelmiranda/.virtualenvs/redis-cache/lib/python3.5/site-packages/aioredis/pool.py:102> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /Users/manuelmiranda/.virtualenvs/redis-cache/lib/python3.5/site-packages/aioredis/connection.py:131> wait_for=<Future pending cb=[Task._wakeup()]> cb=[Future.set_result()]>
Task was destroyed but it is pending!

The code I'm using:

    async def _connect(self):
        if self._pool is None:
            self._pool = await aioredis.create_pool(
                (self.endpoint, self.port))

        return await self._pool

    async def get(self, key):
        with await self._connect() as client:
                await client.get(key)

Am I missing something? Isn't the context_manager supposed to close the pool when exits?

Thanks!

@popravich
Copy link
Contributor

Hi,
Try the following

async def _connect(self):
    if self._pool is None:
        self._pool = await aioredis.create_pool(
            (self.endpoint, self.port))
    return self._pool  # here no await needed

async def get(self, key):
    async with await self._connect() as client:  # use 'async with'
        await client.get(key)

@argaen
Copy link
Contributor Author

argaen commented Sep 30, 2016

Thanks for the quick response! With the changes it gives the following error:

>       async with await self._connect() as client:
E       AttributeError: __aexit__

@popravich
Copy link
Contributor

try without async with, just with

@argaen
Copy link
Contributor Author

argaen commented Oct 1, 2016

Not working:

aiocache/redis.py:50: in set
    with await self._connect() as redis:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aioredis.pool.RedisPool object at 0x7f4a452acf60>

    def __enter__(self):
        raise RuntimeError(
>           "'yield from' should be used as a context manager expression")
E       RuntimeError: 'yield from' should be used as a context manager expression

../../.virtualenvs/aiocache/lib/python3.5/site-packages/aioredis/pool.py:256: RuntimeError

The only way I've been able to make it work is with the first version I posted in the first message

@popravich
Copy link
Contributor

async def _connect(self):
    if self._pool is None:
        self._pool = await aioredis.create_pool(
            (self.endpoint, self.port))
    return self._pool  # here no await needed

async def get(self, key):
    async with (await self._connect()).get() as client:  # use 'async with'
        await client.get(key)

this works for me

@argaen
Copy link
Contributor Author

argaen commented Oct 3, 2016

Yeah, and it does for me too. Also the first code I posted does. I opened this because for every operation this message seems to appear so it seems something is not being completed or closed correctly. With the code you posted it also happens (I adapted it to run it from a main):

import aioredis
import asyncio

async def _connect():
    _pool = await aioredis.create_pool(
        ("127.0.0.1", 6379))
    return _pool

async def get(key):
    async with (await _connect()).get() as client:  # use 'async with'
        await client.get(key)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get("key"))

This shows the following in the output:

23:24 $ python main.py 
None
Task was destroyed but it is pending!
task: <Task pending coro=<RedisPool._do_close() done, defined at /home/blck/.virtualenvs/aiocache/lib/python3.5/site-packages/aioredis/pool.py:100> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/blck/.virtualenvs/aiocache/lib/python3.5/site-packages/aioredis/connection.py:131> wait_for=<Future pending cb=[Task._wakeup()]> cb=[Future.set_result()]>

@popravich
Copy link
Contributor

This is expected behavior as pool.close() / await pool.wait_closed() never gets called.
Connections starts "background" task to read data from socket and it runs forever,
pool.close() / await pool.wait_closed() stops that task and wait it all finishes correctly.

@argaen
Copy link
Contributor Author

argaen commented Oct 4, 2016

Ummm interesting. And shouldn't the pool be closed with the exit of the context manager? I'm trying to solve this because it's printing this for each test and would like to keep it clean. I've tried to close the pool in the closing of the pytest fixture but not working either. Can you have a look at https://github.com/argaen/aiocache/blob/master/tests/test_redis.py#L39 and let me know if I'm doing something wrong??

The code implementing the connect call is at https://github.com/argaen/aiocache/blob/master/aiocache/backends/redis.py#L94

Thanks a lot for your time :)

@popravich
Copy link
Contributor

No problem)

And shouldn't the pool be closed with the exit of the context manager?

No this will result in new establishing new connection to redis for every get/multi_get/set/etc.
So this would be very inefficient and add latency.
Closing pool should be done at the end of program.

test_redis.py: should not redis_cache fixture be a yield_fixture? maybe that is a problem...
You can check aioredis test fixtures.

To make redis closable backend should be instantiated and used explicitly so that one could call
backend.close() or backend._pool.close():

def main(loop):
    redis_backend = config_default_backend(...)
    try:
        loop.run_until_complete(run_main_program())
    finally:
        redis_backend._pool.close()
        loop.run_until_complete(redis_backend._pool.wait_closed())

@argaen
Copy link
Contributor Author

argaen commented Oct 5, 2016

test_redis.py: should not redis_cache fixture be a yield_fixture? maybe that is a problem...
You can check aioredis test fixtures.

Nope, in pytest 3.0 if a fixture has a yield it implicitly becomes an "old" yield_fixture. It works out of the box now.

D'oh! I thought close() and wait_closed() were just the sync and async ways to close the redis connection respectively, my bad. After transforming the fixture to:

@pytest.fixture
def redis_cache(event_loop, mocker):
    cache = RedisCache(namespace="test", loop=event_loop)
    yield cache
    event_loop.run_until_complete(cache.delete(KEY))
    event_loop.run_until_complete(cache.delete("random"))
    cache._pool.close()
    event_loop.run_until_complete(cache._pool.wait_closed())

works perfectly. Thanks a lot for the help!

@nikita-davydov
Copy link

nikita-davydov commented Jan 15, 2021

I'm still receiving this error, I'm using FastAPI with aioredis for caching

My pool code

class RedisPool:
    redis_pool: Optional[Redis] = None

    def __init__(self, redis_host: SocketHostPortCredentials):
        if redis_host.SOCKET:
            self.redis_host = redis_host.SOCKET
        else:
            self.redis_host = (redis_host.HOST, redis_host.PORT)  # type:ignore
        redis_pool_context_var.set(self)

    @property
    async def pool(self) -> Redis:
        if not self.redis_pool:
            self.redis_pool = await aioredis.create_redis_pool(self.redis_host, maxsize=100)
        return await self.redis_pool

    @backoff.on_exception(backoff.expo, ConnectionError, max_tries=3)
    async def set(
            self,
            key: str,
            value: Any,
            *,
            expire: int = 0,
            pexpire: int = 0,
            exist: Optional[bool] = None,
    ) -> None:
        with await self.pool as conn:
            await conn.set(
                key, value, expire=expire, pexpire=pexpire, exist=exist
            )
            # await self.close(conn)

    @backoff.on_exception(backoff.expo, ConnectionError, max_tries=3)
    async def get(self, key: str, *, encoding: object = util._NOTSET) -> Any:
        with await self.pool as conn:
            result = await conn.get(key, encoding=encoding)
            # await self.close(conn)

    @backoff.on_exception(backoff.expo, ConnectionError, max_tries=3)
    async def close(self, conn):
        conn.close()
        await conn.wait_closed()

It doesn't work either with await self.close() or without it

@nikita-davydov
Copy link

@popravich Can you help me, maybe I'm doing something wrong in my Pool, I tried a lot of variants of this code - the result is the same

Task was destroyed but it is pending! task: <Task pending name='Task-57663' coro=<RedisConnection._read_data() done, defined at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x3ea2c3b858b0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:168]>

@jp3049
Copy link

jp3049 commented Jan 21, 2021

@popravich Can you help me, maybe I'm doing something wrong in my Pool, I tried a lot of variants of this code - the result is the same

Task was destroyed but it is pending! task: <Task pending name='Task-57663' coro=<RedisConnection._read_data() done, defined at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x3ea2c3b858b0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /layers/google.python.pip/pip/lib/python3.8/site-packages/aioredis/connection.py:168]>

just solved this by downgrade python to 3.6

@Andrew-Chen-Wang
Copy link
Collaborator

@jp3049 Highly recommend you should stay on Python 3.8 for good practices, more features, less bugs, etc. Please ref my comments in #878 to see if that helps.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants