Skip to content

Commit

Permalink
[GROW-3134] release already acquired connections, when get_connection…
Browse files Browse the repository at this point in the history
…s raised an exception (redis#3)

* [GROW-3134] release already acquired connections, when get_connections raised an exception

* fix style

(cherry picked from commit 94bb915)
  • Loading branch information
zach-iee committed Jun 28, 2023
1 parent 28d80e1 commit 36ef785
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
2 changes: 2 additions & 0 deletions redis/cluster.py
Expand Up @@ -1995,6 +1995,8 @@ def _send_cluster_commands(
try:
connection = get_connection(redis_node, c.args)
except (ConnectionError, TimeoutError) as e:
for n in nodes.values():
n.connection_pool.release(n.connection)
if self.retry and isinstance(e, self.retry._supported_errors):
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
Expand Down
28 changes: 28 additions & 0 deletions tests/test_cluster.py
Expand Up @@ -7,6 +7,7 @@

import pytest

import redis.cluster
from redis import Redis
from redis.backoff import (
ConstantBackoff,
Expand Down Expand Up @@ -2893,6 +2894,33 @@ def raise_ask_error():
assert ask_node.redis_connection.connection.read_response.called
assert res == ["MOCK_OK"]

@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
def test_return_previous_acquired_connections(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")

orig_func = redis.cluster.get_connection
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
if get_connection.call_count == 2:
raise error("mocked error")
else:
return orig_func(target_node, *args, **kwargs)

get_connection.side_effect = raise_error

r.pipeline().get("a").get("b").execute()

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
assert get_connection.call_count == 4
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections

def test_empty_stack(self, r):
"""
If pipeline is executed with no commands it should
Expand Down

0 comments on commit 36ef785

Please sign in to comment.