Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Kernel and session rehydration and syncing #752

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions jupyter_server/serverapp.py
Expand Up @@ -91,6 +91,7 @@
GatewaySessionManager,
GatewayClient,
)
from jupyter_server.sychronizer.synchronizer import Synchronizer
from jupyter_server.auth.authorizer import Authorizer, AllowAllAuthorizer

from jupyter_server.auth.login import LoginHandler
Expand Down Expand Up @@ -1763,6 +1764,10 @@ def _update_server_extensions(self, change):
config=True,
)

run_sychronizer = Bool(
False, help="If True, initializes and runs the Synchronizer.", config=True
)

_starter_app = Instance(
default_value=None,
allow_none=True,
Expand Down Expand Up @@ -1835,6 +1840,18 @@ def init_configurables(self):
)
self.authorizer = self.authorizer_class(parent=self, log=self.log)

if self.run_synchronizer:
fetch_remote_kernels = None
if self.gateway_config.gateway_enabled:
fetch_remote_kernels = self.kernel_manager.list_kernels
self.synchronizer = Synchronizer(
parent=self,
fetch_remote_kernels=fetch_remote_kernels,
multi_kernel_manager=self.kernel_manager,
session_manager=self.session_manager,
contents_manager=self.contents_manager,
)

def init_logging(self):
# This prevents double log messages because tornado use a root logger that
# self.log is a child of. The logging module dipatches log messages to a log
Expand Down Expand Up @@ -2759,6 +2776,8 @@ def start(self):
must be done prior to calling this method."""
self.start_app()
self.start_ioloop()
if self.run_synchronizer:
self.synchronizer.start_regular_syncing()

async def _stop(self):
"""Cleanup resources and stop the IO Loop."""
Expand Down
Empty file.
106 changes: 106 additions & 0 deletions jupyter_server/sychronizer/remote_kernel_table.py
@@ -0,0 +1,106 @@
import os
import sqlite3
from collections import namedtuple
from typing import List

from jupyter_core.paths import jupyter_runtime_dir


KernelMap = namedtuple("KernelMap", ["kernel_id", "remote_id"])


class RemoteKernelTable:
"""An SQLite database that stores the map between
Kernel ID (from Jupyter) and remote ID.
"""

_table_name = "kernelmap"
_table_columns = ("kernel_id", "remote_id")
_db_name = "jupyter-session.db"
_connection = None
_cursor = None

@property
def cursor(self):
"""Start a cursor and create a database called 'session'"""
if self._cursor is None:
self._cursor = self.connection.cursor()
self._cursor.execute(
f"""CREATE TABLE IF NOT EXISTS {self._table_name}
({', '.join(self._table_columns)})"""
)
return self._cursor

@property
def connection(self):
"""Start a database connection"""
session_db_path = os.path.join(jupyter_runtime_dir(), self._db_name)
if self._connection is None:
self._connection = sqlite3.connect(session_db_path, isolation_level=None)
self._connection.row_factory = sqlite3.Row
return self._connection

def query(self, query_string, **identifiers):
"""Build and execute a query."""
if any(key in identifiers for key in self._table_columns):
query = query_string.format(
*list(identifiers.keys()),
table=self._table_name,
)
print(query, tuple(identifiers.values()))
self.cursor.execute(query, tuple(identifiers.values()))
else:
raise Exception("No kernel_id or remote_id given.")

def save(self, kernel_id: str = None, remote_id: str = None) -> None:
self.cursor.execute(f"INSERT INTO {self._table_name} VALUES (?,?)", (kernel_id, remote_id))

def exists(self, **identifier) -> bool:
"""Check to see if the session of a given name exists"""
self.query("SELECT * FROM {table} WHERE {0}=?", **identifier)
row = self.cursor.fetchone()
if row is not None:
return True
return False

def update(self, kernel_id=None, remote_id=None) -> None:
if self.exists(kernel_id=kernel_id):
self.query(
"UPDATE {table} SET {0}=? WHERE {1}=?",
remote_id=remote_id,
kernel_id=kernel_id,
)
elif self.exists(remote_id=remote_id):
self.query(
"UPDATE {table} SET {0}=? WHERE {1}=?",
kernel_id=kernel_id,
remote_id=remote_id,
)
else:
raise Exception("Couldn't find a matching entry in the kernelmap database.")

def delete(self, **identifier) -> None:
self.query("DELETE FROM {table} WHERE {0}=?", **identifier)

def row_to_model(self, row: sqlite3.Row) -> KernelMap:
return KernelMap(kernel_id=row["kernel_id"], remote_id=row["remote_id"])

def list(self) -> List[KernelMap]:
self.cursor.execute(f"SELECT * FROM {self._table_name}")
rows = self.cursor.fetchall()
return [self.row_to_model(row) for row in rows]

def get_remote_map(self) -> dict:
models = self.list()
return {m.remote_id: m.kernel_id for m in models}

def get_kernel_map(self) -> dict:
models = self.list()
return {m.kernel_id: m.remote_id for m in models}

def get(self, **identifier) -> KernelMap:
self.query("SELECT * FROM {table} WHERE {0}=?", **identifier)
row = self.cursor.fetchone()
if not row:
raise Exception("No match was found in database.")
return self.row_to_model(row)