Skip to content

Commit

Permalink
Kill server early on worker error (#2610)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahopkins committed Dec 7, 2022
1 parent 0909e94 commit f32437b
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 90 deletions.
4 changes: 4 additions & 0 deletions sanic/exceptions.py
Expand Up @@ -8,6 +8,10 @@ class RequestCancelled(CancelledError):
quiet = True


class ServerKilled(Exception):
...


class SanicException(Exception):
message: str = ""

Expand Down
6 changes: 6 additions & 0 deletions sanic/mixins/startup.py
Expand Up @@ -41,6 +41,7 @@
from sanic.application.state import ApplicationServerInfo, Mode, ServerStage
from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS, is_atty
from sanic.exceptions import ServerKilled
from sanic.helpers import Default
from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context, process_to_context
Expand Down Expand Up @@ -740,6 +741,7 @@ def serve(
socks = []
sync_manager = Manager()
setup_ext(primary)
exit_code = 0
try:
primary_server_info.settings.pop("main_start", None)
primary_server_info.settings.pop("main_stop", None)
Expand Down Expand Up @@ -849,6 +851,8 @@ def serve(
trigger_events(ready, loop, primary)

manager.run()
except ServerKilled:
exit_code = 1
except BaseException:
kwargs = primary_server_info.settings
error_logger.exception(
Expand All @@ -874,6 +878,8 @@ def serve(
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
if exit_code:
os._exit(exit_code)

@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
Expand Down
36 changes: 30 additions & 6 deletions sanic/worker/manager.py
@@ -1,12 +1,11 @@
import os
import sys

from signal import SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from time import sleep
from typing import List, Optional

from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
from sanic.log import error_logger, logger
from sanic.worker.process import ProcessState, Worker, WorkerProcess

Expand All @@ -18,7 +17,7 @@


class WorkerManager:
THRESHOLD = 50
THRESHOLD = 300 # == 30 seconds

def __init__(
self,
Expand Down Expand Up @@ -130,13 +129,36 @@ def monitor(self):

def wait_for_ack(self): # no cov
misses = 0
message = (
"It seems that one or more of your workers failed to come "
"online in the allowed time. Sanic is shutting down to avoid a "
f"deadlock. The current threshold is {self.THRESHOLD / 10}s. "
"If this problem persists, please check out the documentation "
"___."
)
while not self._all_workers_ack():
sleep(0.1)
if self.monitor_subscriber.poll(0.1):
monitor_msg = self.monitor_subscriber.recv()
if monitor_msg != "__TERMINATE_EARLY__":
self.monitor_publisher.send(monitor_msg)
continue
misses = self.THRESHOLD
message = (
"One of your worker processes terminated before startup "
"was completed. Please solve any errors experienced "
"during startup. If you do not see an exception traceback "
"in your error logs, try running Sanic in in a single "
"process using --single-process or single_process=True. "
"Once you are confident that the server is able to start "
"without errors you can switch back to multiprocess mode."
)
misses += 1
if misses > self.THRESHOLD:
error_logger.error("Not all workers are ack. Shutting down.")
error_logger.error(
"Not all workers acknowledged a successful startup. "
"Shutting down.\n\n" + message
)
self.kill()
sys.exit(1)

@property
def workers(self):
Expand All @@ -156,7 +178,9 @@ def transient_processes(self):

def kill(self):
for process in self.processes:
logger.info("Killing %s [%s]", process.name, process.pid)
os.kill(process.pid, SIGKILL)
raise ServerKilled

def shutdown_signal(self, signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
Expand Down
5 changes: 3 additions & 2 deletions sanic/worker/multiplexer.py
Expand Up @@ -28,8 +28,9 @@ def restart(self, name: str = ""):

reload = restart # no cov

def terminate(self):
self._monitor_publisher.send("__TERMINATE__")
def terminate(self, early: bool = False):
message = "__TERMINATE_EARLY__" if early else "__TERMINATE__"
self._monitor_publisher.send(message)

@property
def pid(self) -> int:
Expand Down
169 changes: 92 additions & 77 deletions sanic/worker/serve.py
@@ -1,6 +1,7 @@
import asyncio
import os
import socket
import warnings

from functools import partial
from multiprocessing.connection import Connection
Expand All @@ -10,6 +11,7 @@
from sanic.application.constants import ServerStage
from sanic.application.state import ApplicationServerInfo
from sanic.http.constants import HTTP
from sanic.log import error_logger
from sanic.models.server_types import Signal
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.runners import _serve_http_1, _serve_http_3
Expand Down Expand Up @@ -45,80 +47,93 @@ def worker_serve(
config=None,
passthru: Optional[Dict[str, Any]] = None,
):
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError("No restart publisher found in worker process")
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(monitor_publisher, worker_state)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
try:
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError(
"No restart publisher found in worker process"
)
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(
monitor_publisher, worker_state
)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
except Exception as e:
warnings.simplefilter("ignore", category=RuntimeWarning)
if monitor_publisher:
error_logger.exception(e)
multiplexer = WorkerMultiplexer(monitor_publisher, {})
multiplexer.terminate(True)
else:
raise e
3 changes: 2 additions & 1 deletion tests/test_unix_socket.py
Expand Up @@ -2,7 +2,7 @@
import logging
import os

from asyncio import AbstractEventLoop
from asyncio import AbstractEventLoop, sleep
from string import ascii_lowercase

import httpcore
Expand Down Expand Up @@ -179,6 +179,7 @@ async def client(app: Sanic, loop: AbstractEventLoop):
assert r.status_code == 200
assert r.text == os.path.abspath(SOCKPATH)
finally:
await sleep(0.2)
app.stop()


Expand Down
4 changes: 3 additions & 1 deletion tests/worker/test_manager.py
Expand Up @@ -3,6 +3,7 @@

import pytest

from sanic.exceptions import ServerKilled
from sanic.worker.manager import WorkerManager


Expand Down Expand Up @@ -76,7 +77,8 @@ def test_kill(os_mock: Mock):
(Mock(), Mock()),
{},
)
manager.kill()
with pytest.raises(ServerKilled):
manager.kill()
os_mock.kill.assert_called_once_with(1234, SIGKILL)


Expand Down
12 changes: 9 additions & 3 deletions tests/worker/test_worker_serve.py
@@ -1,3 +1,5 @@
import logging

from os import environ
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -37,16 +39,20 @@ def test_config_app(mock_app: Mock):
mock_app.update_config.assert_called_once_with({"FOO": "BAR"})


def test_bad_process(mock_app: Mock):
def test_bad_process(mock_app: Mock, caplog):
environ["SANIC_WORKER_NAME"] = "FOO"

message = "No restart publisher found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app))

message = "No worker state found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app, monitor_publisher=Mock()))
publisher = Mock()
with caplog.at_level(logging.ERROR):
worker_serve(**args(mock_app, monitor_publisher=publisher))

assert ("sanic.error", logging.ERROR, message) in caplog.record_tuples
publisher.send.assert_called_once_with("__TERMINATE_EARLY__")

del environ["SANIC_WORKER_NAME"]

Expand Down

0 comments on commit f32437b

Please sign in to comment.