From b69fa00a6265dcde1cb537b776c9a69685d4f56e Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 20 Nov 2022 09:52:14 +0200 Subject: [PATCH 1/7] Added a replacement for the default cluster node in the event of failure. Handles failovers better. --- CHANGES | 1 + redis/asyncio/cluster.py | 35 ++++++++++++----- redis/cluster.py | 61 ++++++++++++++++++++++++------ tests/test_asyncio/test_cluster.py | 20 ++++++++++ tests/test_cluster.py | 23 +++++++++++ 5 files changed, 120 insertions(+), 20 deletions(-) diff --git a/CHANGES b/CHANGES index 883c548f38..120af7c6d2 100644 --- a/CHANGES +++ b/CHANGES @@ -28,6 +28,7 @@ * Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297) * Added CredentialsProvider class to support password rotation * Enable Lock for asyncio cluster mode + * Added a replacement for the default cluster node in the event of failure (#2463) * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index d5a38b2878..7f51a4a626 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -516,7 +516,14 @@ def set_response_callback(self, command: str, callback: ResponseCallbackT) -> No async def _determine_nodes( self, command: str, *args: Any, node_flag: Optional[str] = None - ) -> List["ClusterNode"]: + ) -> tuple[list["ClusterNode"], bool]: + """Determine which nodes should be executed the command on + + Returns: + tuple[list[Type[ClusterNode]], bool]: + A tuple containing a list of target nodes and a bool indicating + if the return node was chosen because it is the default node + """ if not node_flag: # get the nodes group for this command if it was predefined node_flag = self.command_flags.get(command) @@ -524,19 +531,21 @@ async def _determine_nodes( if node_flag in self.node_flags: if node_flag == self.__class__.DEFAULT_NODE: # return the cluster's default node - return [self.nodes_manager.default_node] + return [self.nodes_manager.default_node], True if node_flag == self.__class__.PRIMARIES: # return all primaries - return self.nodes_manager.get_nodes_by_server_type(PRIMARY) + return self.nodes_manager.get_nodes_by_server_type(PRIMARY), False if node_flag == self.__class__.REPLICAS: # return all replicas - return self.nodes_manager.get_nodes_by_server_type(REPLICA) + return self.nodes_manager.get_nodes_by_server_type(REPLICA), False if node_flag == self.__class__.ALL_NODES: # return all nodes - return list(self.nodes_manager.nodes_cache.values()) + return list(self.nodes_manager.nodes_cache.values()), False if node_flag == self.__class__.RANDOM: # return a random node - return [random.choice(list(self.nodes_manager.nodes_cache.values()))] + return [ + random.choice(list(self.nodes_manager.nodes_cache.values())) + ], False # get the node that holds the key's slot return [ @@ -544,7 +553,7 @@ async def _determine_nodes( await self._determine_slot(command, *args), self.read_from_replicas and command in READ_COMMANDS, ) - ] + ], False async def _determine_slot(self, command: str, *args: Any) -> int: if self.command_flags.get(command) == SLOT_ID: @@ -641,6 +650,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: command = args[0] target_nodes = [] target_nodes_specified = False + is_default_node = False retry_attempts = self.cluster_error_retry_attempts passed_targets = kwargs.pop("target_nodes", None) @@ -654,10 +664,13 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: for _ in range(execute_attempts): if self._initialize: await self.initialize() + if is_default_node: + # Replace the default cluster node + self.replace_default_node() try: if not target_nodes_specified: # Determine the nodes to execute the command on - target_nodes = await self._determine_nodes( + target_nodes, is_default_node = await self._determine_nodes( *args, node_flag=passed_targets ) if not target_nodes: @@ -1436,12 +1449,13 @@ async def _execute( ] nodes = {} + is_default_node = False for cmd in todo: passed_targets = cmd.kwargs.pop("target_nodes", None) if passed_targets and not client._is_node_flag(passed_targets): target_nodes = client._parse_target_nodes(passed_targets) else: - target_nodes = await client._determine_nodes( + target_nodes, is_default_node = await client._determine_nodes( *cmd.args, node_flag=passed_targets ) if not target_nodes: @@ -1487,6 +1501,9 @@ async def _execute( result.args = (msg,) + result.args[1:] raise result + if is_default_node: + self.replace_default_node() + return [cmd.result for cmd in stack] def _split_command_across_slots( diff --git a/redis/cluster.py b/redis/cluster.py index 91deaead59..56c354b814 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -379,6 +379,30 @@ class AbstractRedisCluster: ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) + def replace_default_node(self, target_node: "ClusterNode" = None) -> None: + """Replace the default cluster node. + A random cluster node will be chosen if target_node isn't passed, and primaries + will be prioritized. The default node will not be changed if there are no other + nodes in the cluster. + + Args: + target_node (ClusterNode, optional): Target node to replace the default + node. Defaults to None. + """ + if target_node: + self.nodes_manager.default_node = target_node + else: + curr_node = self.get_default_node() + primaries = [node for node in self.get_primaries() if node != curr_node] + if primaries: + # Choose a primary if the cluster contains different primaries + self.nodes_manager.default_node = random.choice(primaries) + else: + # Otherwise, hoose a primary if the cluster contains different primaries + replicas = [node for node in self.get_replicas() if node != curr_node] + if replicas: + self.nodes_manager.default_node = random.choice(replicas) + class RedisCluster(AbstractRedisCluster, RedisClusterCommands): @classmethod @@ -811,7 +835,14 @@ def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.cluster_response_callbacks[command] = callback - def _determine_nodes(self, *args, **kwargs): + def _determine_nodes(self, *args, **kwargs) -> tuple[list["ClusterNode"], bool]: + """Determine which nodes should be executed the command on + + Returns: + tuple[list[Type[ClusterNode]], bool]: + A tuple containing a list of target nodes and a bool indicating + if the return node was chosen because it is the default node + """ command = args[0].upper() if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: command = f"{args[0]} {args[1]}".upper() @@ -825,28 +856,28 @@ def _determine_nodes(self, *args, **kwargs): command_flag = self.command_flags.get(command) if command_flag == self.__class__.RANDOM: # return a random node - return [self.get_random_node()] + return [self.get_random_node()], False elif command_flag == self.__class__.PRIMARIES: # return all primaries - return self.get_primaries() + return self.get_primaries(), False elif command_flag == self.__class__.REPLICAS: # return all replicas - return self.get_replicas() + return self.get_replicas(), False elif command_flag == self.__class__.ALL_NODES: # return all nodes - return self.get_nodes() + return self.get_nodes(), False elif command_flag == self.__class__.DEFAULT_NODE: # return the cluster's default node - return [self.nodes_manager.default_node] + return [self.nodes_manager.default_node], True elif command in self.__class__.SEARCH_COMMANDS[0]: - return [self.nodes_manager.default_node] + return [self.nodes_manager.default_node], True else: # get the node that holds the key's slot slot = self.determine_slot(*args) node = self.nodes_manager.get_node_from_slot( slot, self.read_from_replicas and command in READ_COMMANDS ) - return [node] + return [node], False def _should_reinitialized(self): # To reinitialize the cluster on every MOVED error, @@ -990,6 +1021,7 @@ def execute_command(self, *args, **kwargs): dict """ target_nodes_specified = False + is_default_node = False target_nodes = None passed_targets = kwargs.pop("target_nodes", None) if passed_targets is not None and not self._is_nodes_flag(passed_targets): @@ -1013,7 +1045,7 @@ def execute_command(self, *args, **kwargs): res = {} if not target_nodes_specified: # Determine the nodes to execute the command on - target_nodes = self._determine_nodes( + target_nodes, is_default_node = self._determine_nodes( *args, **kwargs, nodes_flag=passed_targets ) if not target_nodes: @@ -1025,6 +1057,9 @@ def execute_command(self, *args, **kwargs): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: + if is_default_node: + # Replace the default cluster node + self.replace_default_node() if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. @@ -1883,7 +1918,7 @@ def _send_cluster_commands( # if we have to run through it again, we only retry # the commands that failed. attempt = sorted(stack, key=lambda x: x.position) - + is_default_node = False # build a list of node objects based on node names we need to nodes = {} @@ -1900,7 +1935,7 @@ def _send_cluster_commands( if passed_targets and not self._is_nodes_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) else: - target_nodes = self._determine_nodes( + target_nodes, is_default_node = self._determine_nodes( *c.args, node_flag=passed_targets ) if not target_nodes: @@ -1926,6 +1961,8 @@ def _send_cluster_commands( # Connection retries are being handled in the node's # Retry object. Reinitialize the node -> slot table. self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() raise nodes[node_name] = NodeCommands( redis_node.parse_response, @@ -2007,6 +2044,8 @@ def _send_cluster_commands( self.reinitialize_counter += 1 if self._should_reinitialized(): self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() for c in attempt: try: # send each command individually like we diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 38bcaf6c00..f2d29d3d41 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -788,6 +788,26 @@ async def test_can_run_concurrent_commands(self, request: FixtureRequest) -> Non ) await rc.close() + def test_replace_cluster_node(self, r: RedisCluster) -> None: + prev_default_node = r.get_default_node() + r.replace_default_node() + assert r.get_default_node() != prev_default_node + r.replace_default_node(prev_default_node) + assert r.get_default_node() == prev_default_node + + async def test_default_node_is_replaced_after_exception(self, r): + curr_default_node = r.get_default_node() + # CLUSTER NODES command is being executed on the default node + nodes = await r.cluster_nodes() + assert "myself" in nodes.get(curr_default_node.name).get("flags") + + # Mock connection error for the default node + mock_node_resp_exc(curr_default_node, ConnectionError("error")) + # Test that the command succeed from a different node + nodes = await r.cluster_nodes() + assert "myself" not in nodes.get(curr_default_node.name).get("flags") + assert r.get_default_node() != curr_default_node + class TestClusterRedisCommands: """ diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d18fbbbb33..43aeb9e045 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -791,6 +791,29 @@ def test_cluster_retry_object(self, r) -> None: == retry._retries ) + def test_replace_cluster_node(self, r) -> None: + prev_default_node = r.get_default_node() + r.replace_default_node() + assert r.get_default_node() != prev_default_node + r.replace_default_node(prev_default_node) + assert r.get_default_node() == prev_default_node + + def test_default_node_is_replaced_after_exception(self, r): + curr_default_node = r.get_default_node() + # CLUSTER NODES command is being executed on the default node + nodes = r.cluster_nodes() + assert "myself" in nodes.get(curr_default_node.name).get("flags") + + def raise_connection_error(): + raise ConnectionError("error") + + # Mock connection error for the default node + mock_node_resp_func(curr_default_node, raise_connection_error) + # Test that the command succeed from a different node + nodes = r.cluster_nodes() + assert "myself" not in nodes.get(curr_default_node.name).get("flags") + assert r.get_default_node() != curr_default_node + @pytest.mark.onlycluster class TestClusterRedisCommands: From 9cfa7ca197a7455a40f21170dbd440b74cd00301 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 20 Nov 2022 10:57:47 +0200 Subject: [PATCH 2/7] Added ClusterNode type --- redis/cluster.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index 56c354b814..9b293e031f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -4,7 +4,7 @@ import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan @@ -38,6 +38,10 @@ str_if_bytes, ) +TargetNodesT = TypeVar( + "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"] +) + def get_node_name(host: str, port: Union[str, int]) -> str: return f"{host}:{port}" From f8060b76dc87d0681bb3ef19fb8fb5527354dbf6 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 20 Nov 2022 11:21:43 +0200 Subject: [PATCH 3/7] Fixed typing --- redis/asyncio/cluster.py | 3 ++- redis/cluster.py | 8 ++------ tests/test_asyncio/test_cluster.py | 2 ++ tests/test_cluster.py | 2 ++ 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 7f51a4a626..1630fb741c 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -11,6 +11,7 @@ List, Mapping, Optional, + Tuple, Type, TypeVar, Union, @@ -516,7 +517,7 @@ def set_response_callback(self, command: str, callback: ResponseCallbackT) -> No async def _determine_nodes( self, command: str, *args: Any, node_flag: Optional[str] = None - ) -> tuple[list["ClusterNode"], bool]: + ) -> Tuple[List["ClusterNode"], bool]: """Determine which nodes should be executed the command on Returns: diff --git a/redis/cluster.py b/redis/cluster.py index 9b293e031f..9726ab93b1 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -4,7 +4,7 @@ import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan @@ -38,10 +38,6 @@ str_if_bytes, ) -TargetNodesT = TypeVar( - "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"] -) - def get_node_name(host: str, port: Union[str, int]) -> str: return f"{host}:{port}" @@ -839,7 +835,7 @@ def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.cluster_response_callbacks[command] = callback - def _determine_nodes(self, *args, **kwargs) -> tuple[list["ClusterNode"], bool]: + def _determine_nodes(self, *args, **kwargs) -> Tuple[List["ClusterNode"], bool]: """Determine which nodes should be executed the command on Returns: diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index f2d29d3d41..d2808fcb28 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -807,6 +807,8 @@ async def test_default_node_is_replaced_after_exception(self, r): nodes = await r.cluster_nodes() assert "myself" not in nodes.get(curr_default_node.name).get("flags") assert r.get_default_node() != curr_default_node + # Rollback to the old default node + r.replace_default_node(curr_default_node) class TestClusterRedisCommands: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 43aeb9e045..4cc2f5f103 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -813,6 +813,8 @@ def raise_connection_error(): nodes = r.cluster_nodes() assert "myself" not in nodes.get(curr_default_node.name).get("flags") assert r.get_default_node() != curr_default_node + # Rollback to the old default node + r.replace_default_node(curr_default_node) @pytest.mark.onlycluster From b3ab42afa8e514aa58f5ee65bc0cc3226a447566 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 20 Nov 2022 18:54:27 +0200 Subject: [PATCH 4/7] Reverted change in test_auth: we'll replace the default node only for commands we allow to retry on --- redis/cluster.py | 6 +++--- tests/test_asyncio/test_cluster.py | 1 - tests/test_cluster.py | 2 -- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 9726ab93b1..1e8bbf8583 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1057,10 +1057,10 @@ def execute_command(self, *args, **kwargs): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: - if is_default_node: - # Replace the default cluster node - self.replace_default_node() if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if is_default_node: + # Replace the default cluster node + self.replace_default_node() # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. retry_attempts -= 1 diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index d2808fcb28..cec2dc09a4 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -800,7 +800,6 @@ async def test_default_node_is_replaced_after_exception(self, r): # CLUSTER NODES command is being executed on the default node nodes = await r.cluster_nodes() assert "myself" in nodes.get(curr_default_node.name).get("flags") - # Mock connection error for the default node mock_node_resp_exc(curr_default_node, ConnectionError("error")) # Test that the command succeed from a different node diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 4cc2f5f103..43aeb9e045 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -813,8 +813,6 @@ def raise_connection_error(): nodes = r.cluster_nodes() assert "myself" not in nodes.get(curr_default_node.name).get("flags") assert r.get_default_node() != curr_default_node - # Rollback to the old default node - r.replace_default_node(curr_default_node) @pytest.mark.onlycluster From 484861e47607494264a3f016a9c74fdc0982d58a Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 27 Nov 2022 10:34:13 +0200 Subject: [PATCH 5/7] Changed determine_nodes to return only the target nodes, added a comparison to determine whether a node is the default node instead --- redis/asyncio/cluster.py | 38 ++++++++++++++++++-------------------- redis/cluster.py | 36 +++++++++++++++++++----------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 1630fb741c..b21cb66153 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -11,7 +11,6 @@ List, Mapping, Optional, - Tuple, Type, TypeVar, Union, @@ -517,14 +516,9 @@ def set_response_callback(self, command: str, callback: ResponseCallbackT) -> No async def _determine_nodes( self, command: str, *args: Any, node_flag: Optional[str] = None - ) -> Tuple[List["ClusterNode"], bool]: - """Determine which nodes should be executed the command on - - Returns: - tuple[list[Type[ClusterNode]], bool]: - A tuple containing a list of target nodes and a bool indicating - if the return node was chosen because it is the default node - """ + ) -> List["ClusterNode"]: + # Determine which nodes should be executed the command on. + # Returns a list of target nodes. if not node_flag: # get the nodes group for this command if it was predefined node_flag = self.command_flags.get(command) @@ -532,21 +526,19 @@ async def _determine_nodes( if node_flag in self.node_flags: if node_flag == self.__class__.DEFAULT_NODE: # return the cluster's default node - return [self.nodes_manager.default_node], True + return [self.nodes_manager.default_node] if node_flag == self.__class__.PRIMARIES: # return all primaries - return self.nodes_manager.get_nodes_by_server_type(PRIMARY), False + return self.nodes_manager.get_nodes_by_server_type(PRIMARY) if node_flag == self.__class__.REPLICAS: # return all replicas - return self.nodes_manager.get_nodes_by_server_type(REPLICA), False + return self.nodes_manager.get_nodes_by_server_type(REPLICA) if node_flag == self.__class__.ALL_NODES: # return all nodes - return list(self.nodes_manager.nodes_cache.values()), False + return list(self.nodes_manager.nodes_cache.values()) if node_flag == self.__class__.RANDOM: # return a random node - return [ - random.choice(list(self.nodes_manager.nodes_cache.values())) - ], False + return [random.choice(list(self.nodes_manager.nodes_cache.values()))] # get the node that holds the key's slot return [ @@ -554,7 +546,7 @@ async def _determine_nodes( await self._determine_slot(command, *args), self.read_from_replicas and command in READ_COMMANDS, ) - ], False + ] async def _determine_slot(self, command: str, *args: Any) -> int: if self.command_flags.get(command) == SLOT_ID: @@ -671,13 +663,18 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: try: if not target_nodes_specified: # Determine the nodes to execute the command on - target_nodes, is_default_node = await self._determine_nodes( + target_nodes = await self._determine_nodes( *args, node_flag=passed_targets ) if not target_nodes: raise RedisClusterException( f"No targets were found to execute {args} command on" ) + if ( + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() + ): + is_default_node = True if len(target_nodes) == 1: # Return the processed result @@ -1456,7 +1453,7 @@ async def _execute( if passed_targets and not client._is_node_flag(passed_targets): target_nodes = client._parse_target_nodes(passed_targets) else: - target_nodes, is_default_node = await client._determine_nodes( + target_nodes = await client._determine_nodes( *cmd.args, node_flag=passed_targets ) if not target_nodes: @@ -1465,8 +1462,9 @@ async def _execute( ) if len(target_nodes) > 1: raise RedisClusterException(f"Too many targets for command {cmd.args}") - node = target_nodes[0] + if node == client.get_default_node(): + is_default_node = True if node.name not in nodes: nodes[node.name] = (node, []) nodes[node.name][1].append(cmd) diff --git a/redis/cluster.py b/redis/cluster.py index 1e8bbf8583..0b2c4f1387 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -835,14 +835,9 @@ def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.cluster_response_callbacks[command] = callback - def _determine_nodes(self, *args, **kwargs) -> Tuple[List["ClusterNode"], bool]: - """Determine which nodes should be executed the command on - - Returns: - tuple[list[Type[ClusterNode]], bool]: - A tuple containing a list of target nodes and a bool indicating - if the return node was chosen because it is the default node - """ + def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: + # Determine which nodes should be executed the command on. + # Returns a list of target nodes. command = args[0].upper() if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: command = f"{args[0]} {args[1]}".upper() @@ -856,28 +851,28 @@ def _determine_nodes(self, *args, **kwargs) -> Tuple[List["ClusterNode"], bool]: command_flag = self.command_flags.get(command) if command_flag == self.__class__.RANDOM: # return a random node - return [self.get_random_node()], False + return [self.get_random_node()] elif command_flag == self.__class__.PRIMARIES: # return all primaries - return self.get_primaries(), False + return self.get_primaries() elif command_flag == self.__class__.REPLICAS: # return all replicas - return self.get_replicas(), False + return self.get_replicas() elif command_flag == self.__class__.ALL_NODES: # return all nodes - return self.get_nodes(), False + return self.get_nodes() elif command_flag == self.__class__.DEFAULT_NODE: # return the cluster's default node - return [self.nodes_manager.default_node], True + return [self.nodes_manager.default_node] elif command in self.__class__.SEARCH_COMMANDS[0]: - return [self.nodes_manager.default_node], True + return [self.nodes_manager.default_node] else: # get the node that holds the key's slot slot = self.determine_slot(*args) node = self.nodes_manager.get_node_from_slot( slot, self.read_from_replicas and command in READ_COMMANDS ) - return [node], False + return [node] def _should_reinitialized(self): # To reinitialize the cluster on every MOVED error, @@ -1045,13 +1040,18 @@ def execute_command(self, *args, **kwargs): res = {} if not target_nodes_specified: # Determine the nodes to execute the command on - target_nodes, is_default_node = self._determine_nodes( + target_nodes = self._determine_nodes( *args, **kwargs, nodes_flag=passed_targets ) if not target_nodes: raise RedisClusterException( f"No targets were found to execute {args} command on" ) + if ( + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() + ): + is_default_node = True for node in target_nodes: res[node.name] = self._execute_command(node, *args, **kwargs) # Return the processed result @@ -1935,7 +1935,7 @@ def _send_cluster_commands( if passed_targets and not self._is_nodes_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) else: - target_nodes, is_default_node = self._determine_nodes( + target_nodes = self._determine_nodes( *c.args, node_flag=passed_targets ) if not target_nodes: @@ -1948,6 +1948,8 @@ def _send_cluster_commands( ) node = target_nodes[0] + if node == self.get_default_node(): + is_default_node = True # now that we know the name of the node # ( it's just a string in the form of host:port ) From 4cfa069165e6e921ff94dd0b07497d27f34c044e Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Mon, 28 Nov 2022 13:29:39 +0200 Subject: [PATCH 6/7] Fixed async clusterPipeline default node replacement --- redis/asyncio/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index b21cb66153..a4629f5399 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1501,7 +1501,7 @@ async def _execute( raise result if is_default_node: - self.replace_default_node() + client.replace_default_node() return [cmd.result for cmd in stack] From 2388569c08b4d40143475544b48c10bac329f0b2 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Tue, 29 Nov 2022 15:21:16 +0200 Subject: [PATCH 7/7] Changed async clusterPipeline default node to be replaced only if a relevant exception was thrown from that specific node --- redis/asyncio/cluster.py | 28 ++++++++++++++++------------ tests/test_asyncio/test_cluster.py | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index a4629f5399..e0e77c74ae 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -643,7 +643,6 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: command = args[0] target_nodes = [] target_nodes_specified = False - is_default_node = False retry_attempts = self.cluster_error_retry_attempts passed_targets = kwargs.pop("target_nodes", None) @@ -657,7 +656,10 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: for _ in range(execute_attempts): if self._initialize: await self.initialize() - if is_default_node: + if ( + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() + ): # Replace the default cluster node self.replace_default_node() try: @@ -670,11 +672,6 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: raise RedisClusterException( f"No targets were found to execute {args} command on" ) - if ( - len(target_nodes) == 1 - and target_nodes[0] == self.get_default_node() - ): - is_default_node = True if len(target_nodes) == 1: # Return the processed result @@ -1447,7 +1444,6 @@ async def _execute( ] nodes = {} - is_default_node = False for cmd in todo: passed_targets = cmd.kwargs.pop("target_nodes", None) if passed_targets and not client._is_node_flag(passed_targets): @@ -1463,8 +1459,6 @@ async def _execute( if len(target_nodes) > 1: raise RedisClusterException(f"Too many targets for command {cmd.args}") node = target_nodes[0] - if node == client.get_default_node(): - is_default_node = True if node.name not in nodes: nodes[node.name] = (node, []) nodes[node.name][1].append(cmd) @@ -1500,8 +1494,18 @@ async def _execute( result.args = (msg,) + result.args[1:] raise result - if is_default_node: - client.replace_default_node() + default_node = nodes.get(client.get_default_node().name) + if default_node is not None: + # This pipeline execution used the default node, check if we need + # to replace it. + # Note: when the error is raised we'll reset the default node in the + # caller function. + for cmd in default_node[1]: + # Check if it has a command that failed with a relevant + # exception + if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: + client.replace_default_node() + break return [cmd.result for cmd in stack] diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index cec2dc09a4..02efe1234e 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2612,6 +2612,25 @@ async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None: *(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)), ) + @pytest.mark.onlycluster + async def test_cluster_pipeline_with_default_node_error_command(self, r): + """ + Test that the default node is being replaced when it raises a relevant exception + """ + curr_default_node = r.get_default_node() + err = ConnectionError("error") + cmd_count = await r.command_count() + mock_node_resp_exc(curr_default_node, err) + async with r.pipeline(transaction=False) as pipe: + pipe.command_count() + result = await pipe.execute(raise_on_error=False) + + assert result[0] == err + assert r.get_default_node() != curr_default_node + pipe.command_count() + result = await pipe.execute(raise_on_error=False) + assert result[0] == cmd_count + @pytest.mark.ssl class TestSSL: