Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1106 from fedej/master
Browse files Browse the repository at this point in the history
Synchronized reading the responses from a connection
  • Loading branch information
abrookins committed Aug 26, 2021
2 parents 9d71774 + 6983582 commit 1855756
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/1105.bugfix
@@ -0,0 +1 @@
Synchronized reading the responses from a connection
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Expand Up @@ -25,6 +25,7 @@ Dima Kit
Dima Kruk
Dmitry Vasilishin <dmvass>
Evgenii Morozov <emorozov>
Federico Jaite <fedej>
Hugo <hugovk>
Ihor Gorobets
Ihor Liubymov
Expand Down
7 changes: 5 additions & 2 deletions aioredis/connection.py
Expand Up @@ -559,6 +559,7 @@ class Connection:
"_parser",
"_connect_callbacks",
"_buffer_cutoff",
"_lock",
"__dict__",
)

Expand Down Expand Up @@ -609,6 +610,7 @@ def __init__(
)
self._connect_callbacks: List[ConnectCallbackT] = []
self._buffer_cutoff = 6000
self._lock = asyncio.Lock()

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
Expand Down Expand Up @@ -850,8 +852,9 @@ async def can_read(self, timeout: float = 0):
async def read_response(self):
"""Read the response from a previously sent command"""
try:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response()
async with self._lock:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response()
except asyncio.TimeoutError:
await self.disconnect()
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
Expand Down
7 changes: 7 additions & 0 deletions tests/test_connection.py
@@ -1,3 +1,4 @@
import asyncio
from typing import TYPE_CHECKING
from unittest import mock

Expand Down Expand Up @@ -27,3 +28,9 @@ async def test_socket_param_regression(r):
"""A regression test for issue #1060"""
conn = UnixDomainSocketConnection()
await conn.disconnect() == True


@pytest.mark.asyncio
async def test_can_run_concurrent_commands(r):
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))

0 comments on commit 1855756

Please sign in to comment.