Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill server early on worker error #2610

Merged
merged 7 commits into from Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions sanic/exceptions.py
Expand Up @@ -8,6 +8,10 @@ class RequestCancelled(CancelledError):
quiet = True


class ServerKilled(Exception):
...


class SanicException(Exception):
message: str = ""

Expand Down
6 changes: 6 additions & 0 deletions sanic/mixins/startup.py
Expand Up @@ -41,6 +41,7 @@
from sanic.application.state import ApplicationServerInfo, Mode, ServerStage
from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS, is_atty
from sanic.exceptions import ServerKilled
from sanic.helpers import Default
from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context, process_to_context
Expand Down Expand Up @@ -740,6 +741,7 @@ def serve(
socks = []
sync_manager = Manager()
setup_ext(primary)
exit_code = 0
try:
primary_server_info.settings.pop("main_start", None)
primary_server_info.settings.pop("main_stop", None)
Expand Down Expand Up @@ -849,6 +851,8 @@ def serve(
trigger_events(ready, loop, primary)

manager.run()
except ServerKilled:
exit_code = 1
except BaseException:
kwargs = primary_server_info.settings
error_logger.exception(
Expand All @@ -874,6 +878,8 @@ def serve(
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
if exit_code:
os._exit(exit_code)

@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
Expand Down
35 changes: 29 additions & 6 deletions sanic/worker/manager.py
@@ -1,12 +1,11 @@
import os
import sys

from signal import SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from time import sleep
from typing import List, Optional

from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
from sanic.log import error_logger, logger
from sanic.worker.process import ProcessState, Worker, WorkerProcess

Expand All @@ -18,7 +17,7 @@


class WorkerManager:
THRESHOLD = 50
THRESHOLD = 300 # == 30 seconds
ahopkins marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
Expand Down Expand Up @@ -130,13 +129,35 @@ def monitor(self):

def wait_for_ack(self): # no cov
misses = 0
message = (
"It seems that one or more of your workers failed to come "
"online in the allowed time. Sanic is shutting down to avoid a "
f"deadlock. The current threshold is {self.THRESHOLD / 10}s.\n"
"If this problem persists, please check out the documentation "
"___."
)
while not self._all_workers_ack():
sleep(0.1)
if self.monitor_subscriber.poll(0.1):
monitor_msg = self.monitor_subscriber.recv()
if monitor_msg != "__TERMINATE_EARLY__":
self.monitor_publisher.send(monitor_msg)
continue
misses = self.THRESHOLD
message = (
"One of your worker processes terminated before startup "
"was completed. Please solve any errors experienced "
"during startup. If you do not see an exception traceback "
"in your error logs, try running Sanic in in a single "
"process using --single-process or single_process=True. "
"Once you are confident that the server is able to start "
"without errors you can switch back to multiprocess mode."
)
misses += 1
if misses > self.THRESHOLD:
error_logger.error("Not all workers are ack. Shutting down.")
error_logger.error(
"Not all workers are ack. Shutting down.\n\n" + message
ahopkins marked this conversation as resolved.
Show resolved Hide resolved
)
self.kill()
sys.exit(1)

@property
def workers(self):
Expand All @@ -156,7 +177,9 @@ def transient_processes(self):

def kill(self):
for process in self.processes:
logger.info("Killing %s [%s]", process.name, process.pid)
os.kill(process.pid, SIGKILL)
raise ServerKilled

def shutdown_signal(self, signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
Expand Down
5 changes: 3 additions & 2 deletions sanic/worker/multiplexer.py
Expand Up @@ -28,8 +28,9 @@ def restart(self, name: str = ""):

reload = restart # no cov

def terminate(self):
self._monitor_publisher.send("__TERMINATE__")
def terminate(self, early: bool = False):
message = "__TERMINATE_EARLY__" if early else "__TERMINATE__"
self._monitor_publisher.send(message)

@property
def pid(self) -> int:
Expand Down
169 changes: 92 additions & 77 deletions sanic/worker/serve.py
@@ -1,6 +1,7 @@
import asyncio
import os
import socket
import warnings

from functools import partial
from multiprocessing.connection import Connection
Expand All @@ -10,6 +11,7 @@
from sanic.application.constants import ServerStage
from sanic.application.state import ApplicationServerInfo
from sanic.http.constants import HTTP
from sanic.log import error_logger
from sanic.models.server_types import Signal
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.runners import _serve_http_1, _serve_http_3
Expand Down Expand Up @@ -45,80 +47,93 @@ def worker_serve(
config=None,
passthru: Optional[Dict[str, Any]] = None,
):
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError("No restart publisher found in worker process")
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(monitor_publisher, worker_state)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
try:
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError(
"No restart publisher found in worker process"
)
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(
monitor_publisher, worker_state
)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
except Exception as e:
warnings.simplefilter("ignore", category=RuntimeWarning)
if monitor_publisher:
error_logger.exception(e)
multiplexer = WorkerMultiplexer(monitor_publisher, {})
multiplexer.terminate(True)
else:
raise e
3 changes: 2 additions & 1 deletion tests/test_unix_socket.py
Expand Up @@ -2,7 +2,7 @@
import logging
import os

from asyncio import AbstractEventLoop
from asyncio import AbstractEventLoop, sleep
from string import ascii_lowercase

import httpcore
Expand Down Expand Up @@ -179,6 +179,7 @@ async def client(app: Sanic, loop: AbstractEventLoop):
assert r.status_code == 200
assert r.text == os.path.abspath(SOCKPATH)
finally:
await sleep(0.2)
sjsadowski marked this conversation as resolved.
Show resolved Hide resolved
app.stop()


Expand Down
4 changes: 3 additions & 1 deletion tests/worker/test_manager.py
Expand Up @@ -3,6 +3,7 @@

import pytest

from sanic.exceptions import ServerKilled
from sanic.worker.manager import WorkerManager


Expand Down Expand Up @@ -76,7 +77,8 @@ def test_kill(os_mock: Mock):
(Mock(), Mock()),
{},
)
manager.kill()
with pytest.raises(ServerKilled):
manager.kill()
os_mock.kill.assert_called_once_with(1234, SIGKILL)


Expand Down
12 changes: 9 additions & 3 deletions tests/worker/test_worker_serve.py
@@ -1,3 +1,5 @@
import logging

from os import environ
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -37,16 +39,20 @@ def test_config_app(mock_app: Mock):
mock_app.update_config.assert_called_once_with({"FOO": "BAR"})


def test_bad_process(mock_app: Mock):
def test_bad_process(mock_app: Mock, caplog):
environ["SANIC_WORKER_NAME"] = "FOO"

message = "No restart publisher found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app))

message = "No worker state found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app, monitor_publisher=Mock()))
publisher = Mock()
with caplog.at_level(logging.ERROR):
worker_serve(**args(mock_app, monitor_publisher=publisher))

assert ("sanic.error", logging.ERROR, message) in caplog.record_tuples
publisher.send.assert_called_once_with("__TERMINATE_EARLY__")

del environ["SANIC_WORKER_NAME"]

Expand Down