From 79889a5b82c68ac7fc3ad38cb8140ed667f4fbba Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 10:33:31 +0100 Subject: [PATCH 1/6] Add addr info to RPCReceiveError --- locust/rpc/zmqrpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/rpc/zmqrpc.py b/locust/rpc/zmqrpc.py index 2ec3b7c000..3a55088615 100644 --- a/locust/rpc/zmqrpc.py +++ b/locust/rpc/zmqrpc.py @@ -49,7 +49,7 @@ def recv_from_client(self): try: msg = Message.unserialize(data[1]) except (UnicodeDecodeError, msgerr.ExtraData) as e: - raise RPCReceiveError("ZMQ interrupted or corrupted message") from e + raise RPCReceiveError(f"ZMQ interrupted or corrupted message from {addr}") from e return addr, msg def close(self, linger=None): From c5feb095c3d22a62df51ad3355ef26e515ef8d66 Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 10:55:50 +0100 Subject: [PATCH 2/6] Add addr info to RPCReceiveError #2 --- locust/exception.py | 3 +++ locust/rpc/zmqrpc.py | 2 +- locust/runners.py | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/locust/exception.py b/locust/exception.py index 66ae64ad10..9a3fe7a880 100644 --- a/locust/exception.py +++ b/locust/exception.py @@ -68,6 +68,9 @@ class RPCReceiveError(Exception): When raised from zmqrpc, client connection should be reestablished. """ + def __init__(self, *args: object, addr=None) -> None: + super().__init__(*args) + self.addr = addr class AuthCredentialsError(ValueError): diff --git a/locust/rpc/zmqrpc.py b/locust/rpc/zmqrpc.py index 3a55088615..868dfccd85 100644 --- a/locust/rpc/zmqrpc.py +++ b/locust/rpc/zmqrpc.py @@ -49,7 +49,7 @@ def recv_from_client(self): try: msg = Message.unserialize(data[1]) except (UnicodeDecodeError, msgerr.ExtraData) as e: - raise RPCReceiveError(f"ZMQ interrupted or corrupted message from {addr}") from e + raise RPCReceiveError("ZMQ interrupted or corrupted message", addr=addr) from e return addr, msg def close(self, linger=None): diff --git a/locust/runners.py b/locust/runners.py index aa4c3f0159..f161b9ee28 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -989,7 +989,9 @@ def client_listener(self) -> NoReturn: client_id, msg = self.server.recv_from_client() except RPCReceiveError as e: # TODO: Add proper reconnect if https://github.com/zeromq/pyzmq/issues/1809 fixed - logger.error(f"Unrecognized message detected: {e}") + addr = e.addr + message = f'{e}' if not addr else f'{e} from {addr}' + logger.error(f"Unrecognized message detected: {message}") continue except RPCSendError as e: logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") From af1c521507d95c3c912c3f8db06026c39618aab7 Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 11:55:08 +0100 Subject: [PATCH 3/6] Restore reconnect when client_id is known --- locust/runners.py | 20 +++++++++---- locust/test/test_runners.py | 59 ++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index f161b9ee28..31c5dda207 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -988,11 +988,21 @@ def client_listener(self) -> NoReturn: try: client_id, msg = self.server.recv_from_client() except RPCReceiveError as e: - # TODO: Add proper reconnect if https://github.com/zeromq/pyzmq/issues/1809 fixed - addr = e.addr - message = f'{e}' if not addr else f'{e} from {addr}' - logger.error(f"Unrecognized message detected: {message}") - continue + client_id = e.addr + + if client_id and client_id in self.clients: + logger.error(f"RPCError when receiving from client: {e}. Will reset client {client_id}.") + try: + self.server.send_to_client(Message("reconnect", None, client_id)) + except Exception as e: + logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") + self.connection_broken = True + gevent.sleep(FALLBACK_INTERVAL) + continue + else: + message = f"{e}" if not client_id else f"{e} from {client_id}" + logger.error(f"Unrecognized message detected: {message}") + continue except RPCSendError as e: logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") self.connection_broken = True diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 21fea1c697..adb563c974 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -47,6 +47,8 @@ NETWORK_BROKEN = "network broken" BAD_MESSAGE = "bad message" +UNRECOGNIZED_HOST_MESSAGE = "unrecognized host message" +UNRECOGNIZED_MESSAGE = "unrecognized message" def mocked_rpc(raise_on_close=True): @@ -82,7 +84,11 @@ def recv_from_client(self): if msg.data == NETWORK_BROKEN: raise RPCError() if msg.data == BAD_MESSAGE: - raise RPCReceiveError("Bad message") + raise RPCReceiveError(BAD_MESSAGE, addr=msg.node_id) + if msg.data == UNRECOGNIZED_HOST_MESSAGE: + raise RPCReceiveError(UNRECOGNIZED_HOST_MESSAGE, addr="FAKE") + if msg.data == UNRECOGNIZED_MESSAGE: + raise RPCReceiveError(UNRECOGNIZED_MESSAGE) return msg.node_id, msg def close(self, linger=None): @@ -3040,6 +3046,57 @@ def my_task(self): master.start(10, 10) sleep(0.1) server.mocked_send(Message("stats", BAD_MESSAGE, "zeh_fake_client1")) + self.assertEqual(4, len(server.outbox)) + + # Expected message order in outbox: ack, spawn, reconnect, ack + self.assertEqual( + "reconnect", server.outbox[2][1].type, "Master didn't send worker reconnect message when expected." + ) + + def test_worker_sends_unrecognized_message_to_master(self): + """ + Validate master ignores message from worker when it cannot parse adddress info. + """ + + class TestUser(User): + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[TestUser]) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) + self.assertEqual(1, len(master.clients)) + self.assertTrue( + "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" + ) + + master.start(10, 10) + sleep(0.1) + server.mocked_send(Message("stats", UNRECOGNIZED_MESSAGE, "zeh_fake_client1")) + self.assertEqual(2, len(server.outbox)) + + def test_unknown_host_sends_message_to_master(self): + """ + Validate master ignores message that is. + """ + + class TestUser(User): + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[TestUser]) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) + self.assertEqual(1, len(master.clients)) + self.assertTrue( + "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" + ) + + master.start(10, 10) + sleep(0.1) + server.mocked_send(Message("stats", UNRECOGNIZED_HOST_MESSAGE, "unknown_host")) self.assertEqual(2, len(server.outbox)) From 233dd68e1348d38db53ab0dfe82b58636560f5f9 Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 11:58:11 +0100 Subject: [PATCH 4/6] Reformat file --- locust/exception.py | 1 + 1 file changed, 1 insertion(+) diff --git a/locust/exception.py b/locust/exception.py index 9a3fe7a880..6ba264be21 100644 --- a/locust/exception.py +++ b/locust/exception.py @@ -68,6 +68,7 @@ class RPCReceiveError(Exception): When raised from zmqrpc, client connection should be reestablished. """ + def __init__(self, *args: object, addr=None) -> None: super().__init__(*args) self.addr = addr From e2de7e5014c759d84469124bcddbf665fe94f655 Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 12:11:18 +0100 Subject: [PATCH 5/6] Fix mypy error --- locust/runners.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 31c5dda207..63f3465139 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -994,8 +994,8 @@ def client_listener(self) -> NoReturn: logger.error(f"RPCError when receiving from client: {e}. Will reset client {client_id}.") try: self.server.send_to_client(Message("reconnect", None, client_id)) - except Exception as e: - logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") + except Exception as error: + logger.error(f"Error sending reconnect message to worker: {error}. Will reset RPC server.") self.connection_broken = True gevent.sleep(FALLBACK_INTERVAL) continue From b86075ad7fca502243ba78acb8cb90d6e9bd39c8 Mon Sep 17 00:00:00 2001 From: Marcin Halastra Date: Wed, 7 Dec 2022 12:29:20 +0100 Subject: [PATCH 6/6] Update test description --- locust/test/test_runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index adb563c974..2042d32dcb 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -3078,7 +3078,7 @@ def my_task(self): def test_unknown_host_sends_message_to_master(self): """ - Validate master ignores message that is. + Validate master ignores message that is sent from unknown host """ class TestUser(User):