From 65d95e5640eb51b801d443ce56a91b47ddb27dcb Mon Sep 17 00:00:00 2001 From: Federico Jaite Date: Thu, 12 Aug 2021 19:59:09 +0100 Subject: [PATCH 1/3] Allow concurrent coroutines sending commands over a single connection --- CHANGES/1105.bugfix | 1 + aioredis/connection.py | 6 ++++-- tests/test_connection.py | 7 +++++++ 3 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 CHANGES/1105.bugfix diff --git a/CHANGES/1105.bugfix b/CHANGES/1105.bugfix new file mode 100644 index 000000000..8eab92a58 --- /dev/null +++ b/CHANGES/1105.bugfix @@ -0,0 +1 @@ +Allow concurrent coroutines sending commands over a single connection diff --git a/aioredis/connection.py b/aioredis/connection.py index 6ec3741af..3b4a56ca0 100644 --- a/aioredis/connection.py +++ b/aioredis/connection.py @@ -609,6 +609,7 @@ def __init__( ) self._connect_callbacks: List[ConnectCallbackT] = [] self._buffer_cutoff = 6000 + self._lock = asyncio.Lock() def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) @@ -850,8 +851,9 @@ async def can_read(self, timeout: float = 0): async def read_response(self): """Read the response from a previously sent command""" try: - async with async_timeout.timeout(self.socket_timeout): - response = await self._parser.read_response() + async with self._lock: + async with async_timeout.timeout(self.socket_timeout): + response = await self._parser.read_response() except asyncio.TimeoutError: await self.disconnect() raise TimeoutError(f"Timeout reading from {self.host}:{self.port}") diff --git a/tests/test_connection.py b/tests/test_connection.py index 70762adf7..df5da7ad1 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,3 +1,4 @@ +import asyncio from typing import TYPE_CHECKING from unittest import mock @@ -27,3 +28,9 @@ async def test_socket_param_regression(r): """A regression test for issue #1060""" conn = UnixDomainSocketConnection() await conn.disconnect() == True + + +@pytest.mark.asyncio +async def test_can_run_concurrent_commands(r): + assert await r.ping() is True + assert all(await asyncio.gather(*(r.ping() for _ in range(10)))) From b6d3f1d3f39248a1361e6b7496590d2c6bff554a Mon Sep 17 00:00:00 2001 From: Federico Jaite Date: Thu, 12 Aug 2021 20:04:19 +0100 Subject: [PATCH 2/3] Adding fedej as contributor --- CONTRIBUTORS.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index d36302a9e..d453b98fb 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -25,6 +25,7 @@ Dima Kit Dima Kruk Dmitry Vasilishin Evgenii Morozov +Federico Jaite Hugo Ihor Gorobets Ihor Liubymov From 69835822a0e1f28c2f55e13e2b0da5286631e122 Mon Sep 17 00:00:00 2001 From: Federico Jaite Date: Fri, 13 Aug 2021 14:04:07 +0100 Subject: [PATCH 3/3] Tweak change description and add slot --- CHANGES/1105.bugfix | 2 +- aioredis/connection.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES/1105.bugfix b/CHANGES/1105.bugfix index 8eab92a58..916f9ea8f 100644 --- a/CHANGES/1105.bugfix +++ b/CHANGES/1105.bugfix @@ -1 +1 @@ -Allow concurrent coroutines sending commands over a single connection +Synchronized reading the responses from a connection diff --git a/aioredis/connection.py b/aioredis/connection.py index 3b4a56ca0..8c7e4a18d 100644 --- a/aioredis/connection.py +++ b/aioredis/connection.py @@ -559,6 +559,7 @@ class Connection: "_parser", "_connect_callbacks", "_buffer_cutoff", + "_lock", "__dict__", )