From 9b776a141aa82c178c92c950732bb4fe643a9658 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Wed, 7 Dec 2022 21:55:05 +0200 Subject: [PATCH 1/4] Scale workers from inspector --- sanic/cli/app.py | 26 +++++++++--- sanic/cli/arguments.py | 6 +++ sanic/worker/inspector.py | 15 +++++-- sanic/worker/manager.py | 84 +++++++++++++++++++++++++++++++-------- sanic/worker/process.py | 4 +- 5 files changed, 109 insertions(+), 26 deletions(-) diff --git a/sanic/cli/app.py b/sanic/cli/app.py index 7b7eb6c616..24825e25de 100644 --- a/sanic/cli/app.py +++ b/sanic/cli/app.py @@ -98,16 +98,32 @@ def run(self, parse_args=None): except ValueError as e: error_logger.exception(f"Failed to run app: {e}") else: - if self.args.inspect or self.args.inspect_raw or self.args.trigger: + if ( + self.args.inspect + or self.args.inspect_raw + or self.args.trigger + or self.args.scale is not None + ): os.environ["SANIC_IGNORE_PRODUCTION_WARNING"] = "true" else: for http_version in self.args.http: app.prepare(**kwargs, version=http_version) - if self.args.inspect or self.args.inspect_raw or self.args.trigger: - action = self.args.trigger or ( - "raw" if self.args.inspect_raw else "pretty" - ) + if ( + self.args.inspect + or self.args.inspect_raw + or self.args.trigger + or self.args.scale is not None + ): + if self.args.scale is not None: + if self.args.scale <= 0: + error_logger.error("There must be at least 1 worker") + sys.exit(1) + action = f"scale={self.args.scale}" + else: + action = self.args.trigger or ( + "raw" if self.args.inspect_raw else "pretty" + ) inspect( app.config.INSPECTOR_HOST, app.config.INSPECTOR_PORT, diff --git a/sanic/cli/arguments.py b/sanic/cli/arguments.py index e1fe905adf..f01dda9b56 100644 --- a/sanic/cli/arguments.py +++ b/sanic/cli/arguments.py @@ -115,6 +115,12 @@ def attach(self): const="shutdown", help=("Trigger all processes to shutdown"), ) + group.add_argument( + "--scale", + dest="scale", + type=int, + help=("Scale number of workers"), + ) class HTTPVersionGroup(Group): diff --git a/sanic/worker/inspector.py b/sanic/worker/inspector.py index e49b6851f4..769bc784fd 100644 --- a/sanic/worker/inspector.py +++ b/sanic/worker/inspector.py @@ -55,17 +55,20 @@ def __call__(self) -> None: else: action = conn.recv(64) if action == b"reload": - conn.send(b"\n") self.reload() elif action == b"shutdown": - conn.send(b"\n") self.shutdown() + elif action.startswith(b"scale"): + num_workers = int(action.split(b"=", 1)[-1]) + logger.info("Scaling to %s", num_workers) + self.scale(num_workers) else: data = dumps(self.state_to_json()) conn.send(data.encode()) - conn.close() + conn.send(b"\n") + conn.close() finally: - logger.debug("Inspector closing") + logger.info("Inspector closing") sock.close() def stop(self, *_): @@ -80,6 +83,10 @@ def reload(self): message = "__ALL_PROCESSES__:" self._publisher.send(message) + def scale(self, num_workers: int): + message = f"__SCALE__:{num_workers}" + self._publisher.send(message) + def shutdown(self): message = "__TERMINATE__" self._publisher.send(message) diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index ad77a138cc..c88681fd31 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -1,8 +1,10 @@ import os +from itertools import count +from random import choice from signal import SIGINT, SIGTERM, Signals from signal import signal as signal_func -from typing import List, Optional +from typing import Dict, List, Optional from sanic.compat import OS_IS_WINDOWS from sanic.exceptions import ServerKilled @@ -30,33 +32,61 @@ def __init__( ): self.num_server = number self.context = context - self.transient: List[Worker] = [] - self.durable: List[Worker] = [] + self.transient: Dict[str, Worker] = {} + self.durable: Dict[str, Worker] = {} self.monitor_publisher, self.monitor_subscriber = monitor_pubsub self.worker_state = worker_state self.worker_state["Sanic-Main"] = {"pid": self.pid} self.terminated = False + self._serve = serve + self._server_settings = server_settings + self._server_count = count() if number == 0: raise RuntimeError("Cannot serve with no workers") - for i in range(number): - self.manage( - f"{WorkerProcess.SERVER_LABEL}-{i}", - serve, - server_settings, - transient=True, - ) + for _ in range(number): + self.create_server() signal_func(SIGINT, self.shutdown_signal) signal_func(SIGTERM, self.shutdown_signal) - def manage(self, ident, func, kwargs, transient=False): + def manage(self, ident, func, kwargs, transient=False) -> Worker: container = self.transient if transient else self.durable - container.append( - Worker(ident, func, kwargs, self.context, self.worker_state) + worker = Worker(ident, func, kwargs, self.context, self.worker_state) + container[worker.ident] = worker + return worker + + def create_server(self) -> Worker: + server_number = next(self._server_count) + return self.manage( + f"{WorkerProcess.SERVER_LABEL}-{server_number}", + self._serve, + self._server_settings, + transient=True, ) + def shutdown_server(self, ident: Optional[str] = None) -> None: + if not ident: + servers = [ + worker + for worker in self.transient.values() + if worker.ident.startswith(WorkerProcess.SERVER_LABEL) + ] + if not servers: + error_logger.error( + "Server shutdown failed because a server was not found." + ) + return + worker = choice(servers) + else: + worker = self.transient[ident] + + for process in worker.processes: + process.terminate() + + del self.transient[worker.ident] + def run(self): self.start() self.monitor() @@ -94,6 +124,25 @@ def restart(self, process_names: Optional[List[str]] = None, **kwargs): if not process_names or process.name in process_names: process.restart(**kwargs) + def scale(self, num_worker: int): + change = num_worker - self.num_server + if change == 0: + logger.info( + f"No change needed. There are already {num_worker} workers." + ) + return + + logger.info(f"Scaling from {self.num_server} to {num_worker} workers") + if change > 0: + for _ in range(change): + worker = self.create_server() + for process in worker.processes: + process.start() + else: + for _ in range(abs(change)): + self.shutdown_server() + self.num_server = num_worker + def monitor(self): self.wait_for_ack() while True: @@ -109,6 +158,9 @@ def monitor(self): self.shutdown() break split_message = message.split(":", 1) + if message.startswith("__SCALE__"): + self.scale(int(split_message[-1])) + continue processes = split_message[0] reloaded_files = ( split_message[1] if len(split_message) > 1 else None @@ -161,8 +213,8 @@ def wait_for_ack(self): # no cov self.kill() @property - def workers(self): - return self.transient + self.durable + def workers(self) -> List[Worker]: + return list(self.transient.values()) + list(self.durable.values()) @property def processes(self): @@ -172,7 +224,7 @@ def processes(self): @property def transient_processes(self): - for worker in self.transient: + for worker in self.transient.values(): for process in worker.processes: yield process diff --git a/sanic/worker/process.py b/sanic/worker/process.py index e6a4d484fd..7301ca2c4f 100644 --- a/sanic/worker/process.py +++ b/sanic/worker/process.py @@ -133,6 +133,8 @@ def pid(self): class Worker: + WORKER_PREFIX = "Sanic-" + def __init__( self, ident: str, @@ -152,7 +154,7 @@ def __init__( def create_process(self) -> WorkerProcess: process = WorkerProcess( factory=self.context.Process, - name=f"Sanic-{self.ident}-{len(self.processes)}", + name=f"{self.WORKER_PREFIX}{self.ident}-{len(self.processes)}", target=self.serve, kwargs={**self.server_settings}, worker_state=self.worker_state, From a73843d773f6bd614747db2050cca2dde6847744 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Wed, 7 Dec 2022 21:57:24 +0200 Subject: [PATCH 2/4] Scale from multiplexer --- sanic/worker/multiplexer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sanic/worker/multiplexer.py b/sanic/worker/multiplexer.py index f92e7e08cc..4618f36ccd 100644 --- a/sanic/worker/multiplexer.py +++ b/sanic/worker/multiplexer.py @@ -28,6 +28,10 @@ def restart(self, name: str = ""): reload = restart # no cov + def scale(self, num_workers: int): + message = f"__SCALE__:{num_workers}" + self._monitor_publisher.send(message) + def terminate(self, early: bool = False): message = "__TERMINATE_EARLY__" if early else "__TERMINATE__" self._monitor_publisher.send(message) From 7001ad19115d6bafe4d4e3fbc8d3764893c62653 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Mon, 12 Dec 2022 23:37:19 +0200 Subject: [PATCH 3/4] Add testing --- sanic/worker/manager.py | 3 + tests/worker/test_inspector.py | 25 ++++- tests/worker/test_manager.py | 167 ++++++++++++++++++------------- tests/worker/test_multiplexer.py | 5 + 4 files changed, 124 insertions(+), 76 deletions(-) diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index c88681fd31..a7d3b08a06 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -125,6 +125,9 @@ def restart(self, process_names: Optional[List[str]] = None, **kwargs): process.restart(**kwargs) def scale(self, num_worker: int): + if num_worker <= 0: + raise ValueError("Cannot scale to 0 workers.") + change = num_worker - self.num_server if change == 0: logger.info( diff --git a/tests/worker/test_inspector.py b/tests/worker/test_inspector.py index 59753b618c..0c9eb90654 100644 --- a/tests/worker/test_inspector.py +++ b/tests/worker/test_inspector.py @@ -74,7 +74,9 @@ def test_send_inspect_conn_refused(socket: Mock, sys: Mock, caplog): @patch("sanic.worker.inspector.configure_socket") -@pytest.mark.parametrize("action", (b"reload", b"shutdown", b"foo")) +@pytest.mark.parametrize( + "action", (b"reload", b"shutdown", b"scale=5", b"foo") +) def test_run_inspector(configure_socket: Mock, action: bytes): sock = Mock() conn = Mock() @@ -83,6 +85,7 @@ def test_run_inspector(configure_socket: Mock, action: bytes): inspector = Inspector(Mock(), {}, {}, "localhost", 9999) inspector.reload = Mock() # type: ignore inspector.shutdown = Mock() # type: ignore + inspector.scale = Mock() # type: ignore inspector.state_to_json = Mock(return_value="foo") # type: ignore def accept(): @@ -98,20 +101,26 @@ def accept(): ) conn.recv.assert_called_with(64) + conn.send.assert_called_with(b"\n") if action == b"reload": - conn.send.assert_called_with(b"\n") inspector.reload.assert_called() inspector.shutdown.assert_not_called() + inspector.scale.assert_not_called() inspector.state_to_json.assert_not_called() elif action == b"shutdown": - conn.send.assert_called_with(b"\n") inspector.reload.assert_not_called() inspector.shutdown.assert_called() + inspector.scale.assert_not_called() + inspector.state_to_json.assert_not_called() + elif action.startswith(b"scale"): + inspector.reload.assert_not_called() + inspector.shutdown.assert_not_called() + inspector.scale.assert_called_once_with(5) inspector.state_to_json.assert_not_called() else: - conn.send.assert_called_with(b'"foo"') inspector.reload.assert_not_called() inspector.shutdown.assert_not_called() + inspector.scale.assert_not_called() inspector.state_to_json.assert_called() @@ -165,3 +174,11 @@ def test_shutdown(): inspector.shutdown() publisher.send.assert_called_once_with("__TERMINATE__") + + +def test_scale(): + publisher = Mock() + inspector = Inspector(publisher, {}, {}, "", 0) + inspector.scale(3) + + publisher.send.assert_called_once_with("__SCALE__:3") diff --git a/tests/worker/test_manager.py b/tests/worker/test_manager.py index c377b26070..d17b9971d3 100644 --- a/tests/worker/test_manager.py +++ b/tests/worker/test_manager.py @@ -1,3 +1,4 @@ +from logging import ERROR, INFO from signal import SIGINT, SIGKILL from unittest.mock import Mock, call, patch @@ -14,14 +15,7 @@ def fake_serve(): def test_manager_no_workers(): message = "Cannot serve with no workers" with pytest.raises(RuntimeError, match=message): - WorkerManager( - 0, - fake_serve, - {}, - Mock(), - (Mock(), Mock()), - {}, - ) + WorkerManager(0, fake_serve, {}, Mock(), (Mock(), Mock()), {}) @patch("sanic.worker.process.os") @@ -30,14 +24,7 @@ def test_terminate(os_mock: Mock): process.pid = 1234 context = Mock() context.Process.return_value = process - manager = WorkerManager( - 1, - fake_serve, - {}, - context, - (Mock(), Mock()), - {}, - ) + manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), {}) assert manager.terminated is False manager.terminate() assert manager.terminated is True @@ -51,14 +38,7 @@ def test_shutown(os_mock: Mock): process.is_alive.return_value = True context = Mock() context.Process.return_value = process - manager = WorkerManager( - 1, - fake_serve, - {}, - context, - (Mock(), Mock()), - {}, - ) + manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), {}) manager.shutdown() os_mock.kill.assert_called_once_with(1234, SIGINT) @@ -69,14 +49,7 @@ def test_kill(os_mock: Mock): process.pid = 1234 context = Mock() context.Process.return_value = process - manager = WorkerManager( - 1, - fake_serve, - {}, - context, - (Mock(), Mock()), - {}, - ) + manager = WorkerManager(1, fake_serve, {}, context, (Mock(), Mock()), {}) with pytest.raises(ServerKilled): manager.kill() os_mock.kill.assert_called_once_with(1234, SIGKILL) @@ -87,14 +60,7 @@ def test_restart_all(): p2 = Mock() context = Mock() context.Process.side_effect = [p1, p2, p1, p2] - manager = WorkerManager( - 2, - fake_serve, - {}, - context, - (Mock(), Mock()), - {}, - ) + manager = WorkerManager(2, fake_serve, {}, context, (Mock(), Mock()), {}) assert len(list(manager.transient_processes)) manager.restart() p1.terminate.assert_called_once() @@ -136,14 +102,7 @@ def test_monitor_all(): sub.recv.side_effect = ["__ALL_PROCESSES__:", ""] context = Mock() context.Process.side_effect = [p1, p2] - manager = WorkerManager( - 2, - fake_serve, - {}, - context, - (Mock(), sub), - {}, - ) + manager = WorkerManager(2, fake_serve, {}, context, (Mock(), sub), {}) manager.restart = Mock() # type: ignore manager.wait_for_ack = Mock() # type: ignore manager.monitor() @@ -160,14 +119,7 @@ def test_monitor_all_with_files(): sub.recv.side_effect = ["__ALL_PROCESSES__:foo,bar", ""] context = Mock() context.Process.side_effect = [p1, p2] - manager = WorkerManager( - 2, - fake_serve, - {}, - context, - (Mock(), sub), - {}, - ) + manager = WorkerManager(2, fake_serve, {}, context, (Mock(), sub), {}) manager.restart = Mock() # type: ignore manager.wait_for_ack = Mock() # type: ignore manager.monitor() @@ -185,14 +137,7 @@ def test_monitor_one_process(): sub.recv.side_effect = [f"{p1.name}:foo,bar", ""] context = Mock() context.Process.side_effect = [p1, p2] - manager = WorkerManager( - 2, - fake_serve, - {}, - context, - (Mock(), sub), - {}, - ) + manager = WorkerManager(2, fake_serve, {}, context, (Mock(), sub), {}) manager.restart = Mock() # type: ignore manager.wait_for_ack = Mock() # type: ignore manager.monitor() @@ -204,16 +149,94 @@ def test_monitor_one_process(): def test_shutdown_signal(): pub = Mock() - manager = WorkerManager( - 1, - fake_serve, - {}, - Mock(), - (pub, Mock()), - {}, - ) + manager = WorkerManager(1, fake_serve, {}, Mock(), (pub, Mock()), {}) manager.shutdown = Mock() # type: ignore manager.shutdown_signal(SIGINT, None) pub.send.assert_called_with(None) manager.shutdown.assert_called_once_with() + + +def test_shutdown_servers(caplog): + p1 = Mock() + p1.pid = 1234 + context = Mock() + context.Process.side_effect = [p1] + pub = Mock() + manager = WorkerManager(1, fake_serve, {}, context, (pub, Mock()), {}) + + with patch("os.kill") as kill: + with caplog.at_level(ERROR): + manager.shutdown_server() + + kill.assert_called_once_with(1234, SIGINT) + kill.reset_mock() + + assert not caplog.record_tuples + + manager.shutdown_server() + + kill.assert_not_called() + + assert ( + "sanic.error", + ERROR, + "Server shutdown failed because a server was not found.", + ) in caplog.record_tuples + + +def test_shutdown_servers_named(): + p1 = Mock() + p1.pid = 1234 + p2 = Mock() + p2.pid = 6543 + context = Mock() + context.Process.side_effect = [p1, p2] + pub = Mock() + manager = WorkerManager(2, fake_serve, {}, context, (pub, Mock()), {}) + + with patch("os.kill") as kill: + with pytest.raises(KeyError): + manager.shutdown_server("foo") + manager.shutdown_server("Server-1") + + kill.assert_called_once_with(6543, SIGINT) + + +def test_scale(caplog): + p1 = Mock() + p1.pid = 1234 + p2 = Mock() + p2.pid = 3456 + p3 = Mock() + p3.pid = 5678 + context = Mock() + context.Process.side_effect = [p1, p2, p3] + pub = Mock() + manager = WorkerManager(1, fake_serve, {}, context, (pub, Mock()), {}) + + assert len(manager.transient) == 1 + + manager.scale(3) + assert len(manager.transient) == 3 + + with patch("os.kill") as kill: + manager.scale(2) + assert len(manager.transient) == 2 + + manager.scale(1) + assert len(manager.transient) == 1 + + kill.call_count == 2 + + with caplog.at_level(INFO): + manager.scale(1) + + assert ( + "sanic.root", + INFO, + "No change needed. There are already 1 workers.", + ) in caplog.record_tuples + + with pytest.raises(ValueError, match=r"Cannot scale to 0 workers\."): + manager.scale(0) diff --git a/tests/worker/test_multiplexer.py b/tests/worker/test_multiplexer.py index 5047ef7731..e0b1a3688d 100644 --- a/tests/worker/test_multiplexer.py +++ b/tests/worker/test_multiplexer.py @@ -108,6 +108,11 @@ def test_terminate(monitor_publisher: Mock, m: WorkerMultiplexer): monitor_publisher.send.assert_called_once_with("__TERMINATE__") +def test_scale(monitor_publisher: Mock, m: WorkerMultiplexer): + m.scale(99) + monitor_publisher.send.assert_called_once_with("__SCALE__:99") + + def test_properties( monitor_publisher: Mock, worker_state: Dict[str, Any], m: WorkerMultiplexer ): From 664cbe4242ceb64092eb5ecb7471f1da553ab6ea Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Mon, 12 Dec 2022 23:45:55 +0200 Subject: [PATCH 4/4] Ignore security warning on choice --- sanic/worker/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index a7d3b08a06..3af9fa9733 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -78,7 +78,7 @@ def shutdown_server(self, ident: Optional[str] = None) -> None: "Server shutdown failed because a server was not found." ) return - worker = choice(servers) + worker = choice(servers) # nosec B311 else: worker = self.transient[ident]