Skip to content

Commit

Permalink
Merge pull request #2271 from marcinh/feature/rpc_receive_error
Browse files Browse the repository at this point in the history
Fix: Ask worker to reconnect if master gets a broken RPC message
  • Loading branch information
cyberw committed Dec 8, 2022
2 parents 54a0d1c + b86075a commit fde7da7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
4 changes: 4 additions & 0 deletions locust/exception.py
Expand Up @@ -69,6 +69,10 @@ class RPCReceiveError(Exception):
When raised from zmqrpc, client connection should be reestablished.
"""

def __init__(self, *args: object, addr=None) -> None:
super().__init__(*args)
self.addr = addr


class AuthCredentialsError(ValueError):
"""
Expand Down
2 changes: 1 addition & 1 deletion locust/rpc/zmqrpc.py
Expand Up @@ -49,7 +49,7 @@ def recv_from_client(self):
try:
msg = Message.unserialize(data[1])
except (UnicodeDecodeError, msgerr.ExtraData) as e:
raise RPCReceiveError("ZMQ interrupted or corrupted message") from e
raise RPCReceiveError("ZMQ interrupted or corrupted message", addr=addr) from e
return addr, msg

def close(self, linger=None):
Expand Down
18 changes: 15 additions & 3 deletions locust/runners.py
Expand Up @@ -988,9 +988,21 @@ def client_listener(self) -> NoReturn:
try:
client_id, msg = self.server.recv_from_client()
except RPCReceiveError as e:
# TODO: Add proper reconnect if https://github.com/zeromq/pyzmq/issues/1809 fixed
logger.error(f"Unrecognized message detected: {e}")
continue
client_id = e.addr

if client_id and client_id in self.clients:
logger.error(f"RPCError when receiving from client: {e}. Will reset client {client_id}.")
try:
self.server.send_to_client(Message("reconnect", None, client_id))
except Exception as error:
logger.error(f"Error sending reconnect message to worker: {error}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
else:
message = f"{e}" if not client_id else f"{e} from {client_id}"
logger.error(f"Unrecognized message detected: {message}")
continue
except RPCSendError as e:
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
Expand Down
59 changes: 58 additions & 1 deletion locust/test/test_runners.py
Expand Up @@ -47,6 +47,8 @@

NETWORK_BROKEN = "network broken"
BAD_MESSAGE = "bad message"
UNRECOGNIZED_HOST_MESSAGE = "unrecognized host message"
UNRECOGNIZED_MESSAGE = "unrecognized message"


def mocked_rpc(raise_on_close=True):
Expand Down Expand Up @@ -82,7 +84,11 @@ def recv_from_client(self):
if msg.data == NETWORK_BROKEN:
raise RPCError()
if msg.data == BAD_MESSAGE:
raise RPCReceiveError("Bad message")
raise RPCReceiveError(BAD_MESSAGE, addr=msg.node_id)
if msg.data == UNRECOGNIZED_HOST_MESSAGE:
raise RPCReceiveError(UNRECOGNIZED_HOST_MESSAGE, addr="FAKE")
if msg.data == UNRECOGNIZED_MESSAGE:
raise RPCReceiveError(UNRECOGNIZED_MESSAGE)
return msg.node_id, msg

def close(self, linger=None):
Expand Down Expand Up @@ -3040,6 +3046,57 @@ def my_task(self):
master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", BAD_MESSAGE, "zeh_fake_client1"))
self.assertEqual(4, len(server.outbox))

# Expected message order in outbox: ack, spawn, reconnect, ack
self.assertEqual(
"reconnect", server.outbox[2][1].type, "Master didn't send worker reconnect message when expected."
)

def test_worker_sends_unrecognized_message_to_master(self):
"""
Validate master ignores message from worker when it cannot parse adddress info.
"""

class TestUser(User):
@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
)

master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", UNRECOGNIZED_MESSAGE, "zeh_fake_client1"))
self.assertEqual(2, len(server.outbox))

def test_unknown_host_sends_message_to_master(self):
"""
Validate master ignores message that is sent from unknown host
"""

class TestUser(User):
@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue(
"zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"
)

master.start(10, 10)
sleep(0.1)
server.mocked_send(Message("stats", UNRECOGNIZED_HOST_MESSAGE, "unknown_host"))
self.assertEqual(2, len(server.outbox))


Expand Down

0 comments on commit fde7da7

Please sign in to comment.