Skip to content

Commit

Permalink
add sychronizer for rehydrating kernels and sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
Zsailer committed Mar 23, 2022
1 parent cd1b1b8 commit 319770b
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 0 deletions.
19 changes: 19 additions & 0 deletions jupyter_server/serverapp.py
Expand Up @@ -91,6 +91,7 @@
GatewaySessionManager,
GatewayClient,
)
from jupyter_server.sychronizer.synchronizer import Synchronizer
from jupyter_server.auth.authorizer import Authorizer, AllowAllAuthorizer

from jupyter_server.auth.login import LoginHandler
Expand Down Expand Up @@ -1763,6 +1764,10 @@ def _update_server_extensions(self, change):
config=True,
)

run_sychronizer = Bool(
False, help="If True, initializes and runs the Synchronizer.", config=True
)

_starter_app = Instance(
default_value=None,
allow_none=True,
Expand Down Expand Up @@ -1835,6 +1840,18 @@ def init_configurables(self):
)
self.authorizer = self.authorizer_class(parent=self, log=self.log)

if self.run_synchronizer:
fetch_remote_kernels = None
if self.gateway_config.gateway_enabled:
fetch_remote_kernels = self.kernel_manager.list_kernels
self.synchronizer = Synchronizer(
parent=self,
fetch_remote_kernels=fetch_remote_kernels,
multi_kernel_manager=self.kernel_manager,
session_manager=self.session_manager,
contents_manager=self.contents_manager,
)

def init_logging(self):
# This prevents double log messages because tornado use a root logger that
# self.log is a child of. The logging module dipatches log messages to a log
Expand Down Expand Up @@ -2759,6 +2776,8 @@ def start(self):
must be done prior to calling this method."""
self.start_app()
self.start_ioloop()
if self.run_synchronizer:
self.synchronizer.start_regular_syncing()

async def _stop(self):
"""Cleanup resources and stop the IO Loop."""
Expand Down
Empty file.
107 changes: 107 additions & 0 deletions jupyter_server/sychronizer/remote_kernel_table.py
@@ -0,0 +1,107 @@
import os
import sqlite3
from collections import namedtuple
from typing import List

from jupyter_core.paths import jupyter_runtime_dir


KernelMap = namedtuple("KernelMap", ["kernel_id", "remote_id"])


class RemoteKernelTable:
"""An SQLite database that stores the map between
Kernel ID (from Jupyter) and remote ID (from
Data Studio's Notebook Service).
"""

_table_name = "kernelmap"
_table_columns = ("kernel_id", "remote_id")
_db_name = "jupyter-session.db"
_connection = None
_cursor = None

@property
def cursor(self):
"""Start a cursor and create a database called 'session'"""
if self._cursor is None:
self._cursor = self.connection.cursor()
self._cursor.execute(
f"""CREATE TABLE IF NOT EXISTS {self._table_name}
({', '.join(self._table_columns)})"""
)
return self._cursor

@property
def connection(self):
"""Start a database connection"""
session_db_path = os.path.join(jupyter_runtime_dir(), self._db_name)
if self._connection is None:
self._connection = sqlite3.connect(session_db_path, isolation_level=None)
self._connection.row_factory = sqlite3.Row
return self._connection

def query(self, query_string, **identifiers):
"""Build and execute a query."""
if any(key in identifiers for key in self._table_columns):
query = query_string.format(
*list(identifiers.keys()),
table=self._table_name,
)
print(query, tuple(identifiers.values()))
self.cursor.execute(query, tuple(identifiers.values()))
else:
raise Exception("No kernel_id or remote_id given.")

def save(self, kernel_id: str = None, remote_id: str = None) -> None:
self.cursor.execute(f"INSERT INTO {self._table_name} VALUES (?,?)", (kernel_id, remote_id))

def exists(self, **identifier) -> bool:
"""Check to see if the session of a given name exists"""
self.query("SELECT * FROM {table} WHERE {0}=?", **identifier)
row = self.cursor.fetchone()
if row is not None:
return True
return False

def update(self, kernel_id=None, remote_id=None) -> None:
if self.exists(kernel_id=kernel_id):
self.query(
"UPDATE {table} SET {0}=? WHERE {1}=?",
remote_id=remote_id,
kernel_id=kernel_id,
)
elif self.exists(remote_id=remote_id):
self.query(
"UPDATE {table} SET {0}=? WHERE {1}=?",
kernel_id=kernel_id,
remote_id=remote_id,
)
else:
raise Exception("Couldn't find a matching entry in the kernelmap database.")

def delete(self, **identifier) -> None:
self.query("DELETE FROM {table} WHERE {0}=?", **identifier)

def row_to_model(self, row: sqlite3.Row) -> KernelMap:
return KernelMap(kernel_id=row["kernel_id"], remote_id=row["remote_id"])

def list(self) -> List[KernelMap]:
self.cursor.execute(f"SELECT * FROM {self._table_name}")
rows = self.cursor.fetchall()
return [self.row_to_model(row) for row in rows]

def get_remote_map(self) -> dict:
models = self.list()
return {m.remote_id: m.kernel_id for m in models}

def get_kernel_map(self) -> dict:
models = self.list()
return {m.kernel_id: m.remote_id for m in models}

def get(self, **identifier) -> KernelMap:
self.query("SELECT * FROM {table} WHERE {0}=?", **identifier)
row = self.cursor.fetchone()
if not row:
raise Exception("No match was found in database.")
return self.row_to_model(row)

0 comments on commit 319770b

Please sign in to comment.