Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert changes to the Comm Interface #1048

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
187 changes: 147 additions & 40 deletions ipykernel/comm/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,70 @@
# Distributed under the terms of the Modified BSD License.

import uuid
from typing import Optional

import comm.base_comm
import traitlets.config
from traitlets import Bool, Bytes, Instance, Unicode, default
from traitlets import Any, Bool, Bytes, Dict, Instance, Unicode, default
from traitlets.config import LoggingConfigurable

from ipykernel.jsonutil import json_clean
from ipykernel.kernelbase import Kernel


# this is the class that will be created if we do comm.create_comm
class BaseComm(comm.base_comm.BaseComm):
kernel: Optional[Kernel] = None
class Comm(LoggingConfigurable):
"""Class for communicating between a Frontend and a Kernel"""

def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
"""Helper for sending a comm message on IOPub"""
if not Kernel.initialized():
return
kernel = Instance("ipykernel.kernelbase.Kernel", allow_none=True)

@default("kernel")
def _default_kernel(self):
if Kernel.initialized():
return Kernel.instance()

comm_id = Unicode()

@default("comm_id")
def _default_comm_id(self):
return uuid.uuid4().hex

primary = Bool(True, help="Am I the primary or secondary Comm?")

target_name = Unicode("comm")
target_module = Unicode(
None,
allow_none=True,
help="""requirejs module from
which to load comm target.""",
)

topic = Bytes()

@default("topic")
def _default_topic(self):
return ("comm-%s" % self.comm_id).encode("ascii")

_open_data = Dict(help="data dict, if any, to be included in comm_open")
_close_data = Dict(help="data dict, if any, to be included in comm_close")

_msg_callback = Any()
_close_callback = Any()

_closed = Bool(True)

def __init__(self, target_name="", data=None, metadata=None, buffers=None, **kwargs):
if target_name:
kwargs["target_name"] = target_name
super().__init__(**kwargs)
if self.kernel:
if self.primary:
# I am primary, open my peer.
self.open(data=data, metadata=metadata, buffers=buffers)
else:
self._closed = False

def _publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
"""Helper for sending a comm message on IOPub"""
data = {} if data is None else data
metadata = {} if metadata is None else metadata
content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))

if self.kernel is None:
self.kernel = Kernel.instance()

self.kernel.session.send(
self.kernel.iopub_socket,
msg_type,
Expand All @@ -40,38 +78,107 @@ def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
buffers=buffers,
)

def __del__(self):
"""trigger close on gc"""
self.close(deleting=True)

# publishing messages

def open(self, data=None, metadata=None, buffers=None):
"""Open the frontend-side version of this comm"""
if data is None:
data = self._open_data
comm_manager = getattr(self.kernel, "comm_manager", None)
if comm_manager is None:
raise RuntimeError(
"Comms cannot be opened without a kernel "
"and a comm_manager attached to that kernel."
)

comm_manager.register_comm(self)
try:
self._publish_msg(
"comm_open",
data=data,
metadata=metadata,
buffers=buffers,
target_name=self.target_name,
target_module=self.target_module,
)
self._closed = False
except Exception:
comm_manager.unregister_comm(self)
raise

def close(self, data=None, metadata=None, buffers=None, deleting=False):
"""Close the frontend-side version of this comm"""
if self._closed:
# only close once
return
self._closed = True
# nothing to send if we have no kernel
# can be None during interpreter cleanup
if not self.kernel:
return
if data is None:
data = self._close_data
self._publish_msg(
"comm_close",
data=data,
metadata=metadata,
buffers=buffers,
)
if not deleting:
# If deleting, the comm can't be registered
self.kernel.comm_manager.unregister_comm(self)

def send(self, data=None, metadata=None, buffers=None):
"""Send a message to the frontend-side version of this comm"""
self._publish_msg(
"comm_msg",
data=data,
metadata=metadata,
buffers=buffers,
)

# but for backwards compatibility, we need to inherit from LoggingConfigurable
class Comm(traitlets.config.LoggingConfigurable, BaseComm):
"""Class for communicating between a Frontend and a Kernel"""
# registering callbacks

kernel = Instance("ipykernel.kernelbase.Kernel", allow_none=True) # type:ignore[assignment]
comm_id = Unicode()
primary = Bool(True, help="Am I the primary or secondary Comm?")
def on_close(self, callback):
"""Register a callback for comm_close

target_name = Unicode("comm")
target_module = Unicode(
None,
allow_none=True,
help="""requirejs module from
which to load comm target.""",
)
Will be called with the `data` of the close message.

topic = Bytes()
Call `on_close(None)` to disable an existing callback.
"""
self._close_callback = callback

@default("kernel")
def _default_kernel(self):
if Kernel.initialized():
return Kernel.instance()
def on_msg(self, callback):
"""Register a callback for comm_msg

@default("comm_id")
def _default_comm_id(self):
return uuid.uuid4().hex
Will be called with the `data` of any comm_msg messages.

Call `on_msg(None)` to disable an existing callback.
"""
self._msg_callback = callback

# handling of incoming messages

def handle_close(self, msg):
"""Handle a comm_close message"""
self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
if self._close_callback:
self._close_callback(msg)

def __init__(self, *args, **kwargs):
# Comm takes positional arguments, LoggingConfigurable does not, so we explicitly forward arguments
traitlets.config.LoggingConfigurable.__init__(self, **kwargs)
BaseComm.__init__(self, *args, **kwargs)
def handle_msg(self, msg):
"""Handle a comm_msg message"""
self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
if self._msg_callback:
shell = self.kernel.shell
if shell:
shell.events.trigger("pre_execute")
self._msg_callback(msg)
if shell:
shell.events.trigger("post_execute")


__all__ = ["Comm"]
134 changes: 123 additions & 11 deletions ipykernel/comm/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,131 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import logging

import comm.base_comm
import traitlets
import traitlets.config
from traitlets import Dict, Instance
from traitlets.config import LoggingConfigurable
from traitlets.utils.importstring import import_item

from .comm import Comm

class CommManager(traitlets.config.LoggingConfigurable, comm.base_comm.CommManager):

kernel = traitlets.Instance("ipykernel.kernelbase.Kernel")
comms = traitlets.Dict()
targets = traitlets.Dict()
class CommManager(LoggingConfigurable):
"""Manager for Comms in the Kernel"""

def __init__(self, **kwargs):
# CommManager doesn't take arguments, so we explicitly forward arguments
traitlets.config.LoggingConfigurable.__init__(self, **kwargs)
comm.base_comm.CommManager.__init__(self)
kernel = Instance("ipykernel.kernelbase.Kernel")
comms = Dict()
targets = Dict()

# Public APIs

def register_target(self, target_name, f):
"""Register a callable f for a given target name

f will be called with two arguments when a comm_open message is received with `target`:

- the Comm instance
- the `comm_open` message itself.

f can be a Python callable or an import string for one.
"""
if isinstance(f, str):
f = import_item(f)

self.targets[target_name] = f

def unregister_target(self, target_name, f):
"""Unregister a callable registered with register_target"""
return self.targets.pop(target_name)

def register_comm(self, comm):
"""Register a new comm"""
comm_id = comm.comm_id
comm.kernel = self.kernel
self.comms[comm_id] = comm
return comm_id

def unregister_comm(self, comm):
"""Unregister a comm, and close its counterpart"""
# unlike get_comm, this should raise a KeyError
comm = self.comms.pop(comm.comm_id)

def get_comm(self, comm_id):
"""Get a comm with a particular id

Returns the comm if found, otherwise None.

This will not raise an error,
it will log messages if the comm cannot be found.
"""
try:
return self.comms[comm_id]
except KeyError:
self.log.warning("No such comm: %s", comm_id)
if self.log.isEnabledFor(logging.DEBUG):
# don't create the list of keys if debug messages aren't enabled
self.log.debug("Current comms: %s", list(self.comms.keys()))

# Message handlers
def comm_open(self, stream, ident, msg):
"""Handler for comm_open messages"""
content = msg["content"]
comm_id = content["comm_id"]
target_name = content["target_name"]
f = self.targets.get(target_name, None)
comm = Comm(
comm_id=comm_id,
primary=False,
target_name=target_name,
)
self.register_comm(comm)
if f is None:
self.log.error("No such comm target registered: %s", target_name)
else:
try:
f(comm, msg)
return
except Exception:
self.log.error("Exception opening comm with target: %s", target_name, exc_info=True)

# Failure.
try:
comm.close()
except Exception:
self.log.error(
"""Could not close comm during `comm_open` failure
clean-up. The comm may not have been opened yet.""",
exc_info=True,
)

def comm_msg(self, stream, ident, msg):
"""Handler for comm_msg messages"""
content = msg["content"]
comm_id = content["comm_id"]
comm = self.get_comm(comm_id)
if comm is None:
return

try:
comm.handle_msg(msg)
except Exception:
self.log.error("Exception in comm_msg for %s", comm_id, exc_info=True)

def comm_close(self, stream, ident, msg):
"""Handler for comm_close messages"""
content = msg["content"]
comm_id = content["comm_id"]
comm = self.get_comm(comm_id)
if comm is None:
return

self.comms[comm_id]._closed = True
del self.comms[comm_id]

try:
comm.handle_close(msg)
except Exception:
self.log.error("Exception in comm_close for %s", comm_id, exc_info=True)


__all__ = ["CommManager"]