Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize app cleanup #792

Merged
merged 9 commits into from Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/python-tests.yml
Expand Up @@ -137,6 +137,8 @@ jobs:
steps:
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
- uses: jupyterlab/maintainer-tools/.github/actions/test-sdist@v1
with:
test_command: pytest --vv || pytest -vv --lf

python_tests_check: # This job does nothing and is only used for the branch protection
if: always()
Expand Down
51 changes: 7 additions & 44 deletions jupyter_server/pytest_plugin.py
Expand Up @@ -310,10 +310,7 @@ def jp_ensure_app_fixture(request):
@pytest.fixture(scope="function")
def jp_serverapp(jp_ensure_app_fixture, jp_server_config, jp_argv, jp_configurable_serverapp):
"""Starts a Jupyter Server instance based on the established configuration values."""
app = jp_configurable_serverapp(config=jp_server_config, argv=jp_argv)
yield app
app.remove_server_info_file()
app.remove_browser_open_files()
return jp_configurable_serverapp(config=jp_server_config, argv=jp_argv)


@pytest.fixture
Expand Down Expand Up @@ -474,53 +471,19 @@ def inner(nbpath):


@pytest.fixture(autouse=True)
def jp_server_cleanup():
def jp_server_cleanup(io_loop):
yield
app: ServerApp = ServerApp.instance()
loop = io_loop.asyncio_loop
loop.run_until_complete(app._cleanup())
ServerApp.clear_instance()


@pytest.fixture
def jp_cleanup_subprocesses(jp_serverapp):
"""Clean up subprocesses started by a Jupyter Server, i.e. kernels and terminal."""
"""DEPRECATED: The jp_server_cleanup fixture automatically cleans up the singleton ServerApp class"""

async def _():
term_manager = jp_serverapp.web_app.settings.get("terminal_manager")
if term_manager:
terminal_cleanup = term_manager.terminate_all
else:
terminal_cleanup = lambda: None # noqa

kernel_cleanup = jp_serverapp.kernel_manager.shutdown_all

async def kernel_cleanup_steps():
# Try a graceful shutdown with a timeout
try:
await asyncio.wait_for(kernel_cleanup(), timeout=15.0)
except asyncio.TimeoutError:
# Now force a shutdown
try:
await asyncio.wait_for(kernel_cleanup(now=True), timeout=15.0)
except asyncio.TimeoutError:
print(Exception("Kernel never shutdown!"))
except Exception as e:
print(e)

if asyncio.iscoroutinefunction(terminal_cleanup):
try:
await terminal_cleanup()
except Exception as e:
print(e)
else:
try:
terminal_cleanup()
except Exception as e:
print(e)
if asyncio.iscoroutinefunction(kernel_cleanup):
await kernel_cleanup_steps()
else:
try:
kernel_cleanup()
except Exception as e:
print(e)
pass

return _
9 changes: 8 additions & 1 deletion jupyter_server/serverapp.py
Expand Up @@ -2389,6 +2389,8 @@ async def cleanup_kernels(self):
The kernels will shutdown themselves when this process no longer exists,
but explicit shutdown allows the KernelManagers to cleanup the connection files.
"""
if not getattr(self, "kernel_manager", None):
return
n_kernels = len(self.kernel_manager.list_kernel_ids())
kernel_msg = trans.ngettext(
"Shutting down %d kernel", "Shutting down %d kernels", n_kernels
Expand All @@ -2398,6 +2400,8 @@ async def cleanup_kernels(self):

async def cleanup_extensions(self):
"""Call shutdown hooks in all extensions."""
if not getattr(self, "extension_manager", None):
return
n_extensions = len(self.extension_manager.extension_apps)
extension_msg = trans.ngettext(
"Shutting down %d extension", "Shutting down %d extensions", n_extensions
Expand Down Expand Up @@ -2676,6 +2680,8 @@ async def _cleanup(self):
self.remove_browser_open_files()
await self.cleanup_extensions()
await self.cleanup_kernels()
if getattr(self, "session_manager", None):
self.session_manager.close()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down Expand Up @@ -2704,7 +2710,8 @@ def start(self):
async def _stop(self):
"""Cleanup resources and stop the IO Loop."""
await self._cleanup()
self.io_loop.stop()
if getattr(self, "io_loop", None):
self.io_loop.stop()

def stop(self, from_signal=False):
"""Cleanup resources and stop the server."""
Expand Down
38 changes: 26 additions & 12 deletions jupyter_server/services/kernels/kernelmanager.py
Expand Up @@ -149,6 +149,7 @@ def _default_kernel_buffers(self):

def __init__(self, **kwargs):
self.pinned_superclass = MultiKernelManager
self._pending_kernel_tasks = {}
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()

Expand Down Expand Up @@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
kwargs["kernel_id"] = kernel_id
kernel_id = await ensure_async(self.pinned_superclass.start_kernel(self, **kwargs))
self._kernel_connections[kernel_id] = 0
fut = asyncio.ensure_future(self._finish_kernel_start(kernel_id))
task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await fut
await task
else:
self._pending_kernel_tasks[kernel_id] = task
# add busy/activity markers:
kernel = self.get_kernel(kernel_id)
kernel.execution_state = "starting"
Expand All @@ -245,8 +248,8 @@ async def _finish_kernel_start(self, kernel_id):
if hasattr(km, "ready"):
try:
await km.ready
except Exception:
self.log.exception(km.ready.exception())
except Exception as e:
self.log.exception(e)
return

self._kernel_ports[kernel_id] = km.ports
Expand Down Expand Up @@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info["channels"].values():
if not stream.closed():
if not stream.socket.closed:
stream.on_recv(None)
stream.close()

Expand All @@ -387,13 +390,18 @@ def stop_buffering(self, kernel_id):
def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
else:
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)

async def restart_kernel(self, kernel_id, now=False):
Expand Down Expand Up @@ -533,7 +541,8 @@ def stop_watching_activity(self, kernel_id):
"""Stop watching IOPub messages on a kernel for activity."""
kernel = self._kernels[kernel_id]
if getattr(kernel, "_activity_stream", None):
kernel._activity_stream.close()
if not kernel._activity_stream.socket.closed:
kernel._activity_stream.close()
kernel._activity_stream = None

def initialize_culler(self):
Expand Down Expand Up @@ -638,19 +647,24 @@ def __init__(self, **kwargs):
self.pinned_superclass = AsyncMultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()
self._pending_kernel_tasks = {}

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
else:
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Finish shutting down the kernel before clearing state to avoid a race condition.
ret = await self.pinned_superclass.shutdown_kernel(
return await self.pinned_superclass.shutdown_kernel(
self, kernel_id, now=now, restart=restart
)
return ret
3 changes: 0 additions & 3 deletions tests/auth/test_authorizer.py
Expand Up @@ -205,7 +205,6 @@ async def test_authorized_requests(
send_request,
tmp_path,
jp_serverapp,
jp_cleanup_subprocesses,
method,
url,
body,
Expand Down Expand Up @@ -275,5 +274,3 @@ async def test_authorized_requests(

code = await send_request(url, body=body, method=method)
assert code in expected_codes

await jp_cleanup_subprocesses()
22 changes: 6 additions & 16 deletions tests/services/kernels/test_api.py
Expand Up @@ -67,7 +67,7 @@ async def test_no_kernels(jp_fetch):


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_default_kernels(jp_fetch, jp_base_url, jp_cleanup_subprocesses):
async def test_default_kernels(jp_fetch, jp_base_url):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
assert r.headers["location"] == url_path_join(jp_base_url, "/api/kernels/", kernel["id"])
Expand All @@ -79,13 +79,10 @@ async def test_default_kernels(jp_fetch, jp_base_url, jp_cleanup_subprocesses):
["frame-ancestors 'self'", "report-uri " + report_uri, "default-src 'none'"]
)
assert r.headers["Content-Security-Policy"] == expected_csp
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_main_kernel_handler(
jp_fetch, jp_base_url, jp_cleanup_subprocesses, jp_serverapp, pending_kernel_is_ready
):
async def test_main_kernel_handler(jp_fetch, jp_base_url, jp_serverapp, pending_kernel_is_ready):
# Start the first kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -158,11 +155,10 @@ async def test_main_kernel_handler(
)
kernel3 = json.loads(r.body.decode())
assert isinstance(kernel3, dict)
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler(jp_fetch, jp_cleanup_subprocesses, pending_kernel_is_ready):
async def test_kernel_handler(jp_fetch, jp_serverapp, pending_kernel_is_ready):
# Create a kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -206,13 +202,10 @@ async def test_kernel_handler(jp_fetch, jp_cleanup_subprocesses, pending_kernel_
with pytest.raises(tornado.httpclient.HTTPClientError) as e:
await jp_fetch("api", "kernels", bad_id, method="DELETE")
assert expected_http_error(e, 404, "Kernel does not exist: " + bad_id)
await jp_cleanup_subprocesses()


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler_startup_error(
jp_fetch, jp_cleanup_subprocesses, jp_serverapp, jp_kernelspecs
):
async def test_kernel_handler_startup_error(jp_fetch, jp_serverapp, jp_kernelspecs):
if getattr(jp_serverapp.kernel_manager, "use_pending_kernels", False):
return

Expand All @@ -223,7 +216,7 @@ async def test_kernel_handler_startup_error(

@pytest.mark.timeout(TEST_TIMEOUT)
async def test_kernel_handler_startup_error_pending(
jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses, jp_serverapp, jp_kernelspecs
jp_fetch, jp_ws_fetch, jp_serverapp, jp_kernelspecs
):
if not getattr(jp_serverapp.kernel_manager, "use_pending_kernels", False):
return
Expand All @@ -238,9 +231,7 @@ async def test_kernel_handler_startup_error_pending(


@pytest.mark.timeout(TEST_TIMEOUT)
async def test_connection(
jp_fetch, jp_ws_fetch, jp_http_port, jp_auth_header, jp_cleanup_subprocesses
):
async def test_connection(jp_fetch, jp_ws_fetch, jp_http_port, jp_auth_header):
# Create kernel
r = await jp_fetch(
"api", "kernels", method="POST", body=json.dumps({"name": NATIVE_KERNEL_NAME})
Expand Down Expand Up @@ -274,4 +265,3 @@ async def test_connection(
r = await jp_fetch("api", "kernels", kid, method="GET")
model = json.loads(r.body.decode())
assert model["connections"] == 0
await jp_cleanup_subprocesses()
8 changes: 2 additions & 6 deletions tests/services/kernels/test_cull.py
Expand Up @@ -43,7 +43,7 @@
),
],
)
async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
async def test_cull_idle(jp_fetch, jp_ws_fetch):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]
Expand All @@ -59,7 +59,6 @@ async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
ws.close()
culled = await get_cull_status(kid, jp_fetch) # not connected, should be culled
assert culled
await jp_cleanup_subprocesses()


# Pending kernels was released in Jupyter Client 7.1
Expand Down Expand Up @@ -89,9 +88,7 @@ async def test_cull_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses):
],
)
@pytest.mark.timeout(30)
async def test_cull_dead(
jp_fetch, jp_ws_fetch, jp_serverapp, jp_cleanup_subprocesses, jp_kernelspecs
):
async def test_cull_dead(jp_fetch, jp_ws_fetch, jp_serverapp, jp_kernelspecs):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]
Expand All @@ -105,7 +102,6 @@ async def test_cull_dead(
assert model["connections"] == 0
culled = await get_cull_status(kid, jp_fetch) # connected, should not be culled
assert culled
await jp_cleanup_subprocesses()


async def get_cull_status(kid, jp_fetch):
Expand Down