Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Apr 25, 2022
1 parent 1a12302 commit 68c4b7e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
6 changes: 3 additions & 3 deletions jupyter_server/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ def inner(nbpath):


@pytest.fixture(autouse=True)
def jp_server_cleanup():
def jp_server_cleanup(io_loop):
yield
app: ServerApp = ServerApp.instance()
loop = asyncio.get_event_loop_policy().get_event_loop()
loop.run_until_complete(app._stop())
loop = io_loop.asyncio_loop
loop.run_until_complete(app._cleanup())
ServerApp.clear_instance()


Expand Down
36 changes: 26 additions & 10 deletions jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def _default_kernel_buffers(self):

def __init__(self, **kwargs):
self.pinned_superclass = MultiKernelManager
self._pending_kernel_tasks = {}
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()

Expand Down Expand Up @@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
kwargs["kernel_id"] = kernel_id
kernel_id = await ensure_async(self.pinned_superclass.start_kernel(self, **kwargs))
self._kernel_connections[kernel_id] = 0
fut = asyncio.ensure_future(self._finish_kernel_start(kernel_id))
task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await fut
await task
else:
self._pending_kernel_tasks[kernel_id] = task
# add busy/activity markers:
kernel = self.get_kernel(kernel_id)
kernel.execution_state = "starting"
Expand Down Expand Up @@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info["channels"].values():
if not stream.closed():
if not stream.socket.closed:
stream.on_recv(None)
stream.close()

Expand All @@ -387,13 +390,19 @@ def stop_buffering(self, kernel_id):
def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
return

self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)

async def restart_kernel(self, kernel_id, now=False):
Expand Down Expand Up @@ -533,7 +542,8 @@ def stop_watching_activity(self, kernel_id):
"""Stop watching IOPub messages on a kernel for activity."""
kernel = self._kernels[kernel_id]
if getattr(kernel, "_activity_stream", None):
kernel._activity_stream.close()
if not kernel._activity_stream.socket.closed:
kernel._activity_stream.close()
kernel._activity_stream = None

def initialize_culler(self):
Expand Down Expand Up @@ -638,19 +648,25 @@ def __init__(self, **kwargs):
self.pinned_superclass = AsyncMultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()
self._pending_kernel_tasks = {}

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec()

if kernel_id in self._pending_kernel_tasks:
task = self._pending_kernel_tasks.pop(kernel_id)
task.cancel()
return

self.stop_watching_activity(kernel_id)
self.stop_buffering(kernel_id)

# Finish shutting down the kernel before clearing state to avoid a race condition.
ret = await self.pinned_superclass.shutdown_kernel(
return await self.pinned_superclass.shutdown_kernel(
self, kernel_id, now=now, restart=restart
)
return ret
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ addopts = "-raXs --durations 10 --color=yes --doctest-modules"
testpaths = [
"tests/"
]
timeout = 300
timeout = 30
# Restore this setting to debug failures
# timeout_method = "thread"
timeout_method = "thread"
filterwarnings = [
"error",
"ignore:There is no current event loop:DeprecationWarning",
Expand Down

0 comments on commit 68c4b7e

Please sign in to comment.