diff --git a/CHANGES/1105.bugfix b/CHANGES/1105.bugfix new file mode 100644 index 000000000..916f9ea8f --- /dev/null +++ b/CHANGES/1105.bugfix @@ -0,0 +1 @@ +Synchronized reading the responses from a connection diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index d36302a9e..d453b98fb 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -25,6 +25,7 @@ Dima Kit Dima Kruk Dmitry Vasilishin Evgenii Morozov +Federico Jaite Hugo Ihor Gorobets Ihor Liubymov diff --git a/aioredis/connection.py b/aioredis/connection.py index 6ec3741af..8c7e4a18d 100644 --- a/aioredis/connection.py +++ b/aioredis/connection.py @@ -559,6 +559,7 @@ class Connection: "_parser", "_connect_callbacks", "_buffer_cutoff", + "_lock", "__dict__", ) @@ -609,6 +610,7 @@ def __init__( ) self._connect_callbacks: List[ConnectCallbackT] = [] self._buffer_cutoff = 6000 + self._lock = asyncio.Lock() def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) @@ -850,8 +852,9 @@ async def can_read(self, timeout: float = 0): async def read_response(self): """Read the response from a previously sent command""" try: - async with async_timeout.timeout(self.socket_timeout): - response = await self._parser.read_response() + async with self._lock: + async with async_timeout.timeout(self.socket_timeout): + response = await self._parser.read_response() except asyncio.TimeoutError: await self.disconnect() raise TimeoutError(f"Timeout reading from {self.host}:{self.port}") diff --git a/tests/test_connection.py b/tests/test_connection.py index 70762adf7..df5da7ad1 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,3 +1,4 @@ +import asyncio from typing import TYPE_CHECKING from unittest import mock @@ -27,3 +28,9 @@ async def test_socket_param_regression(r): """A regression test for issue #1060""" conn = UnixDomainSocketConnection() await conn.disconnect() == True + + +@pytest.mark.asyncio +async def test_can_run_concurrent_commands(r): + assert await r.ping() is True + assert all(await asyncio.gather(*(r.ping() for _ in range(10))))