Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Ask worker to reconnect if master gets a broken RPC message #2271

Merged
merged 7 commits into from Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions locust/exception.py
Expand Up @@ -69,6 +69,10 @@ class RPCReceiveError(Exception):
When raised from zmqrpc, client connection should be reestablished.
"""

def __init__(self, *args: object, addr=None) -> None:
super().__init__(*args)
self.addr = addr


class AuthCredentialsError(ValueError):
"""
Expand Down
2 changes: 1 addition & 1 deletion locust/rpc/zmqrpc.py
Expand Up @@ -49,7 +49,7 @@ def recv_from_client(self):
try:
msg = Message.unserialize(data[1])
except (UnicodeDecodeError, msgerr.ExtraData) as e:
raise RPCReceiveError("ZMQ interrupted or corrupted message") from e
raise RPCReceiveError("ZMQ interrupted or corrupted message", addr=addr) from e
return addr, msg

def close(self, linger=None):
Expand Down
18 changes: 15 additions & 3 deletions locust/runners.py
Expand Up @@ -988,9 +988,21 @@ def client_listener(self) -> NoReturn:
try:
client_id, msg = self.server.recv_from_client()
except RPCReceiveError as e:
# TODO: Add proper reconnect if https://github.com/zeromq/pyzmq/issues/1809 fixed
logger.error(f"Unrecognized message detected: {e}")
continue
client_id = e.addr

if client_id and client_id in self.clients:
logger.error(f"RPCError when receiving from client: {e}. Will reset client {client_id}.")
try:
self.server.send_to_client(Message("reconnect", None, client_id))
except Exception as error:
logger.error(f"Error sending reconnect message to worker: {error}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
else:
message = f"{e}" if not client_id else f"{e} from {client_id}"
logger.error(f"Unrecognized message detected: {message}")
continue
except RPCSendError as e:
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
Expand Down
59 changes: 58 additions & 1 deletion locust/test/test_runners.py
Expand Up @@ -47,6 +47,8 @@

NETWORK_BROKEN = "network broken"
BAD_MESSAGE = "bad message"
UNRECOGNIZED_HOST_MESSAGE = "unrecognized host message"
UNRECOGNIZED_MESSAGE = "unrecognized message"


def mocked_rpc(raise_on_close=True):
Expand Down Expand Up @@ -82,7 +84,11 @@ def recv_from_client(self):
if msg.data == NETWORK_BROKEN:
raise RPCError()
if msg.data == BAD_MESSAGE:
raise RPCReceiveError("Bad message")
raise RPCReceiveError(BAD_MESSAGE, addr=msg.node_id)
if msg.data == UNRECOGNIZED_HOST_MESSAGE:
raise RPCReceiveError(UNRECOGNIZED_HOST_MESSAGE, addr="FAKE")
if msg.data == UNRECOGNIZED_MESSAGE:
raise RPCReceiveError(UNRECOGNIZED_MESSAGE)
return msg.node_id, msg

def close(self, linger=None):
Expand Down Expand Up @@ -3040,6 +3046,57 @@ def my_task(self):
master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", BAD_MESSAGE, "zeh_fake_client1"))
self.assertEqual(4, len(server.outbox))

# Expected message order in outbox: ack, spawn, reconnect, ack
self.assertEqual(
"reconnect", server.outbox[2][1].type, "Master didn't send worker reconnect message when expected."
)

def test_worker_sends_unrecognized_message_to_master(self):
"""
Validate master ignores message from worker when it cannot parse adddress info.
"""

class TestUser(User):
@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
)

master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", UNRECOGNIZED_MESSAGE, "zeh_fake_client1"))
self.assertEqual(2, len(server.outbox))

def test_unknown_host_sends_message_to_master(self):
"""
Validate master ignores message that is sent from unknown host
"""

class TestUser(User):
@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
)

master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", UNRECOGNIZED_HOST_MESSAGE, "unknown_host"))
self.assertEqual(2, len(server.outbox))


Expand Down