Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Synchronized reading the responses from a connection #1106

Merged
merged 3 commits into from Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/1105.bugfix
@@ -0,0 +1 @@
Synchronized reading the responses from a connection
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Expand Up @@ -25,6 +25,7 @@ Dima Kit
Dima Kruk
Dmitry Vasilishin <dmvass>
Evgenii Morozov <emorozov>
Federico Jaite <fedej>
Hugo <hugovk>
Ihor Gorobets
Ihor Liubymov
Expand Down
7 changes: 5 additions & 2 deletions aioredis/connection.py
Expand Up @@ -559,6 +559,7 @@ class Connection:
"_parser",
"_connect_callbacks",
"_buffer_cutoff",
"_lock",
"__dict__",
)

Expand Down Expand Up @@ -609,6 +610,7 @@ def __init__(
)
self._connect_callbacks: List[ConnectCallbackT] = []
self._buffer_cutoff = 6000
self._lock = asyncio.Lock()

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
Expand Down Expand Up @@ -850,8 +852,9 @@ async def can_read(self, timeout: float = 0):
async def read_response(self):
"""Read the response from a previously sent command"""
try:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response()
async with self._lock:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response()
except asyncio.TimeoutError:
await self.disconnect()
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
Expand Down
7 changes: 7 additions & 0 deletions tests/test_connection.py
@@ -1,3 +1,4 @@
import asyncio
from typing import TYPE_CHECKING
from unittest import mock

Expand Down Expand Up @@ -27,3 +28,9 @@ async def test_socket_param_regression(r):
"""A regression test for issue #1060"""
conn = UnixDomainSocketConnection()
await conn.disconnect() == True


@pytest.mark.asyncio
async def test_can_run_concurrent_commands(r):
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))