Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn about event loop changes using Redis/FakeRedis #622

Open
M1ha-Shvn opened this issue Sep 26, 2023 · 5 comments
Open

Warn about event loop changes using Redis/FakeRedis #622

M1ha-Shvn opened this issue Sep 26, 2023 · 5 comments

Comments

@M1ha-Shvn
Copy link

Hi.
I'm using fakeredis to run tests via pytest-asyncio in FastAPI application.

Library versions:

fakeredis==2.19.0
fastapi==0.103.1
hiredis==2.2.3
lupa==2.0
pytest-asyncio==0.21.1
redis==5.0.0

Everything works well in python 3.9 and earlier. But in 3.10 and 3.11 I start getting exceptions:

test setup failed
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
request = <SubRequest 'storage_clean_up' for <Function test_read_from_cache__cache_save_failure>>
kwargs = {}
setup = <function _wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup at 0x7feb442d5900>

    @functools.wraps(fixture)
    def _async_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )
    
        async def setup():
            res = await func(**_add_kwargs(func, kwargs, event_loop, request))
            return res
    
>       return event_loop.run_until_complete(setup())

../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:326: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323: in setup
    res = await func(**_add_kwargs(func, kwargs, event_loop, request))
/opt/project/core/storages/tests/fixtures.py:23: in storage_clean_up
    await cls.storage.truncate()
/opt/project/core/storages/redis.py:285: in truncate
    await client.flushall()
../pip/lib/python3.10/site-packages/redis/asyncio/client.py:545: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:1111: in get_connection
    if await connection.can_read_destructive():
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:472: in can_read_destructive
    return await self._parser.can_read_destructive()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:179: in can_read_destructive
    return await self.read_from_socket()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
../pip/lib/python3.10/site-packages/fakeredis/aioredis.py:83: in read
    return await self._socket.responses.get()  # type:ignore
/usr/local/lib/python3.10/asyncio/queues.py:156: in get
    getter = self._get_loop().create_future()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7feb429a9c60 maxsize=0 tasks=4>

    def _get_loop(self):
        loop = events._get_running_loop()
    
        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
>           raise RuntimeError(f'{self!r} is bound to a different event loop')
E           RuntimeError: <Queue at 0x7feb429a9c60 maxsize=0 tasks=4> is bound to a different event loop

/usr/local/lib/python3.10/asyncio/mixins.py:30: RuntimeError

If I disable FakeRedis and use redis-py in tests without isolation, I start getting this error:

self = Connection<host=redis,port=6379,db=0>, disable_decoding = False
timeout = None

    async def read_response(
        self,
        disable_decoding: bool = False,
        timeout: Optional[float] = None,
        *,
        disconnect_on_error: bool = True,
        push_request: Optional[bool] = False,
    ):
        """Read the response from a previously sent command"""
        read_timeout = timeout if timeout is not None else self.socket_timeout
        host_error = self._host_error()
        try:
            if (
                read_timeout is not None
                and self.protocol in ["3", 3]
                and not HIREDIS_AVAILABLE
            ):
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding, push_request=push_request
                    )
            elif read_timeout is not None:
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding
                    )
            elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
                response = await self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
>               response = await self._parser.read_response(
                    disable_decoding=disable_decoding
                )

/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:509: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:203: in read_response
    await self.read_from_socket()
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
/usr/local/lib/python3.10/asyncio/streams.py:669: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport closing fd=11>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           RuntimeError: Task <Task pending name='Task-3' coro=<_wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup() running at /app/pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

/usr/local/lib/python3.10/asyncio/streams.py:501: RuntimeError

During handling of the above exception, another exception occurred:

cls = <core.storages.tests.test_storages.TestRedisStorage object at 0x7f8c542801c0>

    @classmethod
    @pytest_asyncio.fixture(autouse=True)
    async def storage_clean_up(cls):
>       await cls.storage.truncate()

test_storages.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../redis.py:282: in truncate
    await client.flushall()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:550: in execute_command
    return await conn.retry.call_with_retry(
/app/pip/lib/python3.10/site-packages/redis/asyncio/retry.py:59: in call_with_retry
    return await do()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:524: in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:571: in parse_response
    response = await connection.read_response()
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:529: in read_response
    await self.disconnect(nowait=True)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:385: in disconnect
    self._writer.close()  # type: ignore[union-attr]
/usr/local/lib/python3.10/asyncio/streams.py:337: in close
    return self._transport.close()
/usr/local/lib/python3.10/asyncio/selector_events.py:706: in close
    self._loop.call_soon(self._call_connection_lost, None)
/usr/local/lib/python3.10/asyncio/base_events.py:753: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/local/lib/python3.10/asyncio/base_events.py:515: RuntimeError

Firstly, I thought, it was a problem in Fakeredis and created an issue there. But now I suppose, that it is something connected with pytest-asyncio as:

  1. Tests fail in base redis library too
  2. Redis-py does not fail without tests, my project works well while testing the server manually without tests

Can you suggest anything and where to dig more?
Thanks

@seifertm
Copy link
Contributor

Both stack traces complain about queues or tasks being attached to different event loops. Two things come to my mind:

  1. The event_loop fixture closes any existing loop and returns a new one. If you have tests that need to reuse the same loop, you need to ensure the correct scope of the event_loop fixture. You can print fixture scopes by running pytest with the option --setup-show.
  2. Async fixtures a pre-processed during collection phase. They run in their own loop. Trying to reuse a queue from a fixture in a test will trigger the error about the queue being attached to a different loop.

As to why this occurs only from Python 3.10 onwards, I don't know.

@M1ha-Shvn
Copy link
Author

Thanks for comment. I'll have a look

@seifertm seifertm added the needsinfo Requires additional information from the issue author label Oct 3, 2023
@seifertm
Copy link
Contributor

seifertm commented Oct 9, 2023

@M1ha-Shvn Any updates on your issue?

@M1ha-Shvn
Copy link
Author

I guess, my problem is a result of architecture: Redis storage connection object realises Singleton pattern. It creates single object and reuses Redis instance. For testing purposes it replaces Redis with FakeRedis. When tests are run each test is run in separate loop, but singleton object is still the same (as python thread is the same) which leads to loop changing problems when calling Redis commands.

I've fixed the problem for myself by 2 things:

  1. A hacky solution regenerating FakeRedis object on loop change:
      current_loop = asyncio.get_event_loop()
      if self._loop is not None and self._loop is not current_loop:
          self._redis_clients.clear()

          # Script should be created in same loop, where it is executed
          self._generate_worker_id_script = None

          log('asyncio_loop_changed', 'asyncio_loop_changed', level=logging.WARNING)

      self._loop = current_loop
  1. Adding a global fixture with common event_loop to conftest.py. I'm not exactly sure why, but without this some clean up fixtures inside tests didn't work properly and left my FakeRedis database in unclear state
@pytest.fixture(scope="session", autouse=True)
def event_loop() -> Generator["AbstractEventLoop", Any, None]:
    policy = get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()

@seifertm seifertm added enhancement and removed needsinfo Requires additional information from the issue author labels Oct 10, 2023
@seifertm
Copy link
Contributor

Maybe there's a smarter way for pytest-asyncio to detect loop changes. I'll leave the issue open for the time being.

@seifertm seifertm changed the title RuntimeError when using Redis/FakeRedis inside pytest-asyncio Warn about event loop changes using Redis/FakeRedis Nov 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants