Skip to content

Commit

Permalink
Backport PR jupyter-server#792: Centralize app cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Apr 29, 2022
1 parent 6791cf9 commit 91d3229
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 161 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-tests.yml
Expand Up @@ -137,6 +137,8 @@ jobs:
steps:
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
- uses: jupyterlab/maintainer-tools/.github/actions/test-sdist@v1
with:
test_command: pytest --vv || pytest -vv --lf

python_tests_check: # This job does nothing and is only used for the branch protection
if: always()
Expand Down
46 changes: 7 additions & 39 deletions jupyter_server/pytest_plugin.py
Expand Up @@ -299,10 +299,7 @@ def jp_ensure_app_fixture(request):
@pytest.fixture(scope="function")
def jp_serverapp(jp_ensure_app_fixture, jp_server_config, jp_argv, jp_configurable_serverapp):
"""Starts a Jupyter Server instance based on the established configuration values."""
app = jp_configurable_serverapp(config=jp_server_config, argv=jp_argv)
yield app
app.remove_server_info_file()
app.remove_browser_open_files()
return jp_configurable_serverapp(config=jp_server_config, argv=jp_argv)


@pytest.fixture
Expand Down Expand Up @@ -463,48 +460,19 @@ def inner(nbpath):


@pytest.fixture(autouse=True)
def jp_server_cleanup():
def jp_server_cleanup(io_loop):
yield
app: ServerApp = ServerApp.instance()
loop = io_loop.asyncio_loop
loop.run_until_complete(app._cleanup())
ServerApp.clear_instance()


@pytest.fixture
def jp_cleanup_subprocesses(jp_serverapp):
"""Clean up subprocesses started by a Jupyter Server, i.e. kernels and terminal."""
"""DEPRECATED: The jp_server_cleanup fixture automatically cleans up the singleton ServerApp class"""

async def _():
terminal_cleanup = jp_serverapp.web_app.settings["terminal_manager"].terminate_all
kernel_cleanup = jp_serverapp.kernel_manager.shutdown_all

async def kernel_cleanup_steps():
# Try a graceful shutdown with a timeout
try:
await asyncio.wait_for(kernel_cleanup(), timeout=15.0)
except asyncio.TimeoutError:
# Now force a shutdown
try:
await asyncio.wait_for(kernel_cleanup(now=True), timeout=15.0)
except asyncio.TimeoutError:
print(Exception("Kernel never shutdown!"))
except Exception as e:
print(e)

if asyncio.iscoroutinefunction(terminal_cleanup):
try:
await terminal_cleanup()
except Exception as e:
print(e)
else:
try:
await terminal_cleanup()
except Exception as e:
print(e)
if asyncio.iscoroutinefunction(kernel_cleanup):
await kernel_cleanup_steps()
else:
try:
kernel_cleanup()
except Exception as e:
print(e)
pass

return _
9 changes: 8 additions & 1 deletion jupyter_server/serverapp.py
Expand Up @@ -2427,6 +2427,8 @@ async def cleanup_kernels(self):
The kernels will shutdown themselves when this process no longer exists,
but explicit shutdown allows the KernelManagers to cleanup the connection files.
"""
if not getattr(self, "kernel_manager", None):
return
n_kernels = len(self.kernel_manager.list_kernel_ids())
kernel_msg = trans.ngettext(
"Shutting down %d kernel", "Shutting down %d kernels", n_kernels
Expand All @@ -2453,6 +2455,8 @@ async def cleanup_terminals(self):

async def cleanup_extensions(self):
"""Call shutdown hooks in all extensions."""
if not getattr(self, "extension_manager", None):
return
n_extensions = len(self.extension_manager.extension_apps)
extension_msg = trans.ngettext(
"Shutting down %d extension", "Shutting down %d extensions", n_extensions
Expand Down Expand Up @@ -2732,6 +2736,8 @@ async def _cleanup(self):
await self.cleanup_extensions()
await self.cleanup_kernels()
await self.cleanup_terminals()
if getattr(self, "session_manager", None):
self.session_manager.close()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down Expand Up @@ -2760,7 +2766,8 @@ def start(self):
async def _stop(self):
"""Cleanup resources and stop the IO Loop."""
await self._cleanup()
self.io_loop.stop()
if getattr(self, "io_loop", None):
self.io_loop.stop()

def stop(self, from_signal=False):
"""Cleanup resources and stop the server."""
Expand Down
38 changes: 26 additions & 12 deletions jupyter_server/services/kernels/kernelmanager.py
Expand Up @@ -149,6 +149,7 @@ def _default_kernel_buffers(self):

def __init__(self, **kwargs):
self.pinned_superclass = MultiKernelManager
self._pending_kernel_tasks = {}
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()

Expand Down Expand Up @@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
kwargs["kernel_id"] = kernel_id
kernel_id = await ensure_async(self.pinned_superclass.start_kernel(self, **kwargs))
self._kernel_connections[kernel_id] = 0
fut = asyncio.ensure_future(self._finish_kernel_start(kernel_id))
task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await fut
await task
else:
self._pending_kernel_tasks[kernel_id] = task
# add busy/activity markers:
kernel = self.get_kernel(kernel_id)
kernel.execution_state = "starting"
Expand All @@ -245,8 +248,8 @@ async def _finish_kernel_start(self, kernel_id):
if hasattr(km, "ready"):
try:
await km.ready
except Exception:
self.log.exception(km.ready.exception())
except Exception as e:
self.log.exception(e)
return

self._kernel_ports[kernel_id] = km.ports
Expand Down Expand Up @@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info["channels"].values():
if not stream.closed():
if not stream.socket.closed:
stream.on_recv(None)
stream.close()

Expand All @@ -387,13 +390,18 @@ def stop_buffering(self, kernel_id):
def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
else:
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)

async def restart_kernel(self, kernel_id, now=False):
Expand Down Expand Up @@ -533,7 +541,8 @@ def stop_watching_activity(self, kernel_id):
"""Stop watching IOPub messages on a kernel for activity."""
kernel = self._kernels[kernel_id]
if getattr(kernel, "_activity_stream", None):
kernel._activity_stream.close()
if not kernel._activity_stream.socket.closed:
kernel._activity_stream.close()
kernel._activity_stream = None

def initialize_culler(self):
Expand Down Expand Up @@ -638,19 +647,24 @@ def __init__(self, **kwargs):
self.pinned_superclass = AsyncMultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()
self._pending_kernel_tasks = {}

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
else:
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Finish shutting down the kernel before clearing state to avoid a race condition.
ret = await self.pinned_superclass.shutdown_kernel(
return await self.pinned_superclass.shutdown_kernel(
self, kernel_id, now=now, restart=restart
)
return ret
3 changes: 0 additions & 3 deletions tests/auth/test_authorizer.py
Expand Up @@ -205,7 +205,6 @@ async def test_authorized_requests(
send_request,
tmp_path,
jp_serverapp,
jp_cleanup_subprocesses,
method,
url,
body,
Expand Down Expand Up @@ -275,5 +274,3 @@ async def test_authorized_requests(

code = await send_request(url, body=body, method=method)
assert code in expected_codes

await jp_cleanup_subprocesses()
22 changes: 6 additions & 16 deletions tests/services/kernels/test_api.py
Expand Up @@ -67,7 +67,7 @@ async def test_no_kernels(jp_fetch):


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_default_kernels(jp_fetch, jp_base_url, jp_cleanup_subprocesses):
async def test_default_kernels(jp_fetch, jp_base_url):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
assert r.headers["location"] == url_path_join(jp_base_url, "/api/kernels/", kernel["id"])
Expand All @@ -79,13 +79,10 @@ async def test_default_kernels(jp_fetch, jp_base_url, jp_cleanup_subprocesses):
["frame-ancestors 'self'", "report-uri " + report_uri, "default-src 'none'"]
)
assert r.headers["Content-Security-Policy"] == expected_csp
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_main_kernel_handler(
jp_fetch, jp_base_url, jp_cleanup_subprocesses, jp_serverapp, pending_kernel_is_ready
):
async def test_main_kernel_handler(jp_fetch, jp_base_url, jp_serverapp, pending_kernel_is_ready):
# Start the first kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -158,11 +155,10 @@ async def test_main_kernel_handler(
)
kernel3 = json.loads(r.body.decode())
assert isinstance(kernel3, dict)
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler(jp_fetch, jp_cleanup_subprocesses, pending_kernel_is_ready):
async def test_kernel_handler(jp_fetch, jp_serverapp, pending_kernel_is_ready):
# Create a kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -206,13 +202,10 @@ async def test_kernel_handler(jp_fetch, jp_cleanup_subprocesses, pending_kernel_
with pytest.raises(tornado.httpclient.HTTPClientError) as e:
await jp_fetch("api", "kernels", bad_id, method="DELETE")
assert expected_http_error(e, 404, "Kernel does not exist: " + bad_id)
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler_startup_error(
jp_fetch, jp_cleanup_subprocesses, jp_serverapp, jp_kernelspecs
):
async def test_kernel_handler_startup_error(jp_fetch, jp_serverapp, jp_kernelspecs):
if getattr(jp_serverapp.kernel_manager, "use_pending_kernels", False):
return

Expand All @@ -223,7 +216,7 @@ async def test_kernel_handler_startup_error(

@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler_startup_error_pending(
jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses, jp_serverapp, jp_kernelspecs
jp_fetch, jp_ws_fetch, jp_serverapp, jp_kernelspecs
):
if not getattr(jp_serverapp.kernel_manager, "use_pending_kernels", False):
return
Expand All @@ -238,9 +231,7 @@ async def test_kernel_handler_startup_error_pending(


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_connection(
jp_fetch, jp_ws_fetch, jp_http_port, jp_auth_header, jp_cleanup_subprocesses
):
async def test_connection(jp_fetch, jp_ws_fetch, jp_http_port, jp_auth_header):
# Create kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -274,4 +265,3 @@ async def test_connection(
r = await jp_fetch("api", "kernels", kid, method="GET")
model = json.loads(r.body.decode())
assert model["connections"] == 0
await jp_cleanup_subprocesses()
8 changes: 2 additions & 6 deletions tests/services/kernels/test_cull.py
Expand Up @@ -43,7 +43,7 @@
),
],
)
async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
async def test_cull_idle(jp_fetch, jp_ws_fetch):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]
Expand All @@ -59,7 +59,6 @@ async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
ws.close()
culled = await get_cull_status(kid, jp_fetch) # not connected, should be culled
assert culled
await jp_cleanup_subprocesses()


# Pending kernels was released in Jupyter Client 7.1
Expand Down Expand Up @@ -89,9 +88,7 @@ async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
],
)
@pytest.mark.timeout(30)
async def test_cull_dead(
jp_fetch, jp_ws_fetch, jp_serverapp, jp_cleanup_subprocesses, jp_kernelspecs
):
async def test_cull_dead(jp_fetch, jp_ws_fetch, jp_serverapp, jp_kernelspecs):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]
Expand All @@ -105,7 +102,6 @@ async def test_cull_dead(
assert model["connections"] == 0
culled = await get_cull_status(kid, jp_fetch) # connected, should not be culled
assert culled
await jp_cleanup_subprocesses()


async def get_cull_status(kid, jp_fetch):
Expand Down

0 comments on commit 91d3229

Please sign in to comment.