From 44665e7f16065bf12d370cfc439e553c0de63f96 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Tue, 17 May 2022 10:05:21 +0100 Subject: [PATCH 1/8] Initial implementations of close and wait_closed --- uvicorn/server.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 494e2b658..8036ee631 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -55,6 +55,10 @@ def __init__(self, config: Config) -> None: self.force_exit = False self.last_notified = 0.0 + #: Set once immediately after all requests have been closed and shutdown + #: has completed. + self._shutdown_event = asyncio.Event() + def run(self, sockets: Optional[List[socket.socket]] = None) -> None: self.config.setup_event_loop() return asyncio.run(self.serve(sockets=sockets)) @@ -78,7 +82,7 @@ async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: if self.should_exit: return await self.main_loop() - await self.shutdown(sockets=sockets) + await self._shutdown(sockets=sockets) message = "Finished server process [%d]" color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]" @@ -249,7 +253,7 @@ async def on_tick(self, counter: int) -> bool: return self.server_state.total_requests >= self.config.limit_max_requests return False - async def shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def _shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None: logger.info("Shutting down") # Stop accepting new connections. @@ -283,6 +287,27 @@ async def shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None: if not self.force_exit: await self.lifespan.shutdown() + self._shutdown_event.set() + + def close(self, *, force_exit: bool = False) -> None: + """ + Asks the server, asynchronously, to initiate shutdown. + It should be safe to call this from a request handler. + """ + self.should_exit = True + if force_exit and not self.force_exit: + self.force_exit = True + + async def wait_closed(self) -> None: + """ + Blocks until the server is completely shutdown. + """ + await self._shutdown_event.wait() + + async def shutdown(self) -> None: + self.close() + await self.wait_closed() + def install_signal_handlers(self) -> None: if threading.current_thread() is not threading.main_thread(): # Signals can only be listened to from the main thread. From 2cd94568137d25aa7df5449e0966b461399f9bd9 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Tue, 17 May 2022 14:45:36 +0100 Subject: [PATCH 2/8] Introduce start_serving method --- uvicorn/server.py | 41 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 8036ee631..f8a9fed14 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -55,6 +55,14 @@ def __init__(self, config: Config) -> None: self.force_exit = False self.last_notified = 0.0 + self._sockets: Optional[List[socket.socket]] = None + + self._main_task: Optional[asyncio.Task] = None + + #: Set once immediately after startup has completed and the server has + #: started listening for requests. + self._startup_event = asyncio.Event() + #: Set once immediately after all requests have been closed and shutdown #: has completed. self._shutdown_event = asyncio.Event() @@ -63,7 +71,7 @@ def run(self, sockets: Optional[List[socket.socket]] = None) -> None: self.config.setup_event_loop() return asyncio.run(self.serve(sockets=sockets)) - async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def _main(self) -> None: process_id = os.getpid() config = self.config @@ -72,22 +80,44 @@ async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: self.lifespan = config.lifespan_class(config) - self.install_signal_handlers() - message = "Started server process [%d]" color_message = "Started server process [" + click.style("%d", fg="cyan") + "]" logger.info(message, process_id, extra={"color_message": color_message}) - await self.startup(sockets=sockets) + await self.startup(sockets=self._sockets) if self.should_exit: return await self.main_loop() - await self._shutdown(sockets=sockets) + await self._shutdown(sockets=self._sockets) message = "Finished server process [%d]" color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]" logger.info(message, process_id, extra={"color_message": color_message}) + async def start_serving(self) -> None: + if self._main_task is not None: + return + self._main_task = asyncio.create_task(self._main()) + await self._startup_event.wait() + + async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: + if self._main_task is not None: + raise RuntimeError("cannot call serve on running server") + + if sockets is not None and self._sockets is not None: + raise RuntimeError("cannot override already provided sockets list") + + self._sockets = sockets + + self.install_signal_handlers() + try: + await self.start_serving() + await self.wait_closed() + + except asyncio.CancelledError: + self.close() + await self.wait_closed() + async def startup(self, sockets: list = None) -> None: await self.lifespan.startup() if self.lifespan.should_exit: @@ -174,6 +204,7 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: pass self.started = True + self._startup_event.set() def _log_started_message(self, listeners: Sequence[socket.SocketType]) -> None: config = self.config From bf877950791de6499811691c33f5dcd643dad6bb Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Tue, 17 May 2022 17:02:17 +0100 Subject: [PATCH 3/8] Stop waiting for startup or shutdown if server aborts --- uvicorn/server.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index f8a9fed14..3afe84602 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -95,10 +95,22 @@ async def _main(self) -> None: logger.info(message, process_id, extra={"color_message": color_message}) async def start_serving(self) -> None: - if self._main_task is not None: - return - self._main_task = asyncio.create_task(self._main()) - await self._startup_event.wait() + """ + Starts the server running in a background task and blocks until startup + is either fully complete, or fails. + + Idempotent. Can be called multiple times without creating multiple + instances. + """ + if self._main_task is None: + self._main_task = asyncio.create_task(self._main()) + + # If the main task exits before the startup event is set it means that + # startup has failed. + await asyncio.wait( + [asyncio.create_task(self._startup_event.wait()), self._main_task], + return_when=asyncio.FIRST_COMPLETED, + ) async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: if self._main_task is not None: @@ -333,7 +345,10 @@ async def wait_closed(self) -> None: """ Blocks until the server is completely shutdown. """ - await self._shutdown_event.wait() + await asyncio.wait( + [asyncio.create_task(self._shutdown_event.wait()), self._main_task], + return_when=asyncio.FIRST_COMPLETED, + ) async def shutdown(self) -> None: self.close() From 402614439b1e76b79e89d949e0ee10a6074ea502 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Tue, 17 May 2022 17:17:46 +0100 Subject: [PATCH 4/8] Remove shutdown event in favour of just awaiting the main task --- uvicorn/server.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 3afe84602..472825a4a 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -63,10 +63,6 @@ def __init__(self, config: Config) -> None: #: started listening for requests. self._startup_event = asyncio.Event() - #: Set once immediately after all requests have been closed and shutdown - #: has completed. - self._shutdown_event = asyncio.Event() - def run(self, sockets: Optional[List[socket.socket]] = None) -> None: self.config.setup_event_loop() return asyncio.run(self.serve(sockets=sockets)) @@ -330,8 +326,6 @@ async def _shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None if not self.force_exit: await self.lifespan.shutdown() - self._shutdown_event.set() - def close(self, *, force_exit: bool = False) -> None: """ Asks the server, asynchronously, to initiate shutdown. @@ -345,10 +339,9 @@ async def wait_closed(self) -> None: """ Blocks until the server is completely shutdown. """ - await asyncio.wait( - [asyncio.create_task(self._shutdown_event.wait()), self._main_task], - return_when=asyncio.FIRST_COMPLETED, - ) + if self._main_task is None: + raise RuntimeError("Server hasn't been started") + await self._main_task async def shutdown(self) -> None: self.close() From 2c17123d40cd7f485dcfe22ff2293d2346ad9172 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Wed, 18 May 2022 17:01:26 +0100 Subject: [PATCH 5/8] Make it possible to pass list of sockets into the constructor --- uvicorn/server.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 472825a4a..7dbf6d290 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -46,7 +46,9 @@ def __init__(self) -> None: class Server: - def __init__(self, config: Config) -> None: + def __init__( + self, config: Config, *, sockets: Optional[List[socket.socket]] = None + ) -> None: self.config = config self.server_state = ServerState() @@ -55,7 +57,7 @@ def __init__(self, config: Config) -> None: self.force_exit = False self.last_notified = 0.0 - self._sockets: Optional[List[socket.socket]] = None + self._sockets: Optional[List[socket.socket]] = sockets self._main_task: Optional[asyncio.Task] = None From 118213c6fe6bae52ce33eea2fde2de588d5b34f5 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Wed, 18 May 2022 17:07:15 +0100 Subject: [PATCH 6/8] Properly wait for server startup and shutdown in tests --- tests/utils.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 47dd21353..8a6f83bc5 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,4 +1,3 @@ -import asyncio import os from contextlib import asynccontextmanager, contextmanager from pathlib import Path @@ -8,14 +7,12 @@ @asynccontextmanager async def run_server(config: Config, sockets=None): - server = Server(config=config) - cancel_handle = asyncio.ensure_future(server.serve(sockets=sockets)) - await asyncio.sleep(0.1) + server = Server(config=config, sockets=sockets) + await server.start_serving() try: yield server finally: await server.shutdown() - cancel_handle.cancel() @contextmanager From 42ed57433338052fa13209083008a8282b96818a Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Wed, 18 May 2022 18:00:51 +0100 Subject: [PATCH 7/8] Defer constructing startup event until we are definitely in event loop --- uvicorn/server.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 7dbf6d290..6ebf08d67 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -61,9 +61,9 @@ def __init__( self._main_task: Optional[asyncio.Task] = None - #: Set once immediately after startup has completed and the server has - #: started listening for requests. - self._startup_event = asyncio.Event() + #: Created on demand and set once immediately after startup has + #: completed and the server has started listening for requests. + self._startup_event: Optional[asyncio.Event] = None def run(self, sockets: Optional[List[socket.socket]] = None) -> None: self.config.setup_event_loop() @@ -100,6 +100,12 @@ async def start_serving(self) -> None: Idempotent. Can be called multiple times without creating multiple instances. """ + if self._startup_event is None: + # We defer creating the startup event until start serving is called + # because there is no guarantee that the constructor will be called + # with an active loop. + self._startup_event = asyncio.Event() + if self._main_task is None: self._main_task = asyncio.create_task(self._main()) @@ -214,6 +220,8 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: pass self.started = True + + assert self._startup_event is not None self._startup_event.set() def _log_started_message(self, listeners: Sequence[socket.SocketType]) -> None: From 79adcddcb073d8e2150c2b5167a31fff04290854 Mon Sep 17 00:00:00 2001 From: Ben Mather Date: Wed, 29 Jun 2022 09:43:21 +0100 Subject: [PATCH 8/8] Fix clobbering of sockets list from constructor Co-authored-by: Florimond Manca --- uvicorn/server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/uvicorn/server.py b/uvicorn/server.py index 6ebf08d67..052a42549 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -120,10 +120,10 @@ async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: if self._main_task is not None: raise RuntimeError("cannot call serve on running server") - if sockets is not None and self._sockets is not None: - raise RuntimeError("cannot override already provided sockets list") - - self._sockets = sockets + if sockets is not None: + if self._sockets is not None: + raise RuntimeError("cannot override already provided sockets list") + self._sockets = sockets self.install_signal_handlers() try: