Skip to content

Commit

Permalink
Dump table schemas and make CI fail when they change (#5264)
Browse files Browse the repository at this point in the history
* dump table schemas

Signed-off-by: harupy <hkawamura0130@gmail.com>

* detect diff

Signed-off-by: harupy <hkawamura0130@gmail.com>

* fix echos

Signed-off-by: harupy <hkawamura0130@gmail.com>

* add exit 1

Signed-off-by: harupy <hkawamura0130@gmail.com>

* create directory on host

Signed-off-by: harupy <hkawamura0130@gmail.com>

* ignore ordering

Signed-off-by: harupy <hkawamura0130@gmail.com>

* lint

Signed-off-by: harupy <hkawamura0130@gmail.com>

* use sorted

Signed-off-by: harupy <hkawamura0130@gmail.com>

* update migration instructions

Signed-off-by: harupy <hkawamura0130@gmail.com>
  • Loading branch information
harupy committed Jan 14, 2022
1 parent 4c58179 commit e365e69
Show file tree
Hide file tree
Showing 10 changed files with 612 additions and 11 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/master.yml
Expand Up @@ -174,6 +174,16 @@ jobs:
docker-compose run mlflow-postgres
docker-compose run mlflow-mysql
docker-compose run mlflow-mssql
diff=$(git diff --color)
if [ ! -z "$diff"]
then
echo "$diff"
echo 'Table schemas changed (see diff above).'
echo 'Please run `cd tests/db && ./update_schemas.sh` and ensure new schemas are correct.'
exit 1
fi
docker-compose down --volumes --remove-orphans --rmi all
- name: Run anaconda compatibility tests
run: |
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTING.rst
Expand Up @@ -449,6 +449,8 @@ checkout of MLflow:
# MLflow relies on Alembic (https://alembic.sqlalchemy.org) for schema migrations.
$ alembic -c mlflow/store/db_migrations/alembic.ini revision -m "add new field to db"
Generating ~/mlflow/mlflow/store/db_migrations/versions/b446d3984cfa_add_new_field_to_db.py
# Update schema files
$ cd tests/db && ./update_schemas.sh
These commands generate a new migration script (e.g., at ``~/mlflow/mlflow/alembic/versions/12341123_add_new_field_to_db.py``)
Expand Down
1 change: 0 additions & 1 deletion tests/db/.gitignore
@@ -1,2 +1 @@
schemas
mlflowdb
2 changes: 1 addition & 1 deletion tests/db/docker-compose.yml
Expand Up @@ -68,4 +68,4 @@ services:
- .:/tmp/mlflow
environment:
MLFLOW_TRACKING_URI: sqlite:////tmp/mlflow/mlflowdb
command: python run_checks.py --schema-output schemas/sqlite.sql
command: bash -ex -c "rm -rf mlflowdb && python run_checks.py --schema-output schemas/sqlite.sql"
54 changes: 45 additions & 9 deletions tests/db/run_checks.py
@@ -1,4 +1,5 @@
import os
import re
import argparse

import sqlalchemy
Expand All @@ -24,7 +25,6 @@ def parse_args():

def run_logging_operations():
with mlflow.start_run() as run:
print("Tracking URI:", mlflow.get_tracking_uri())
mlflow.log_param("p", "param")
mlflow.log_metric("m", 1.0)
mlflow.set_tag("t", "tag")
Expand All @@ -33,7 +33,6 @@ def run_logging_operations():
python_model=MockModel(),
registered_model_name="mock",
)
print(mlflow.get_run(run.info.run_id))

# Ensure the following migration scripts work correctly:
# - cfd24bdc0731_update_run_status_constraint_with_killed.py
Expand All @@ -54,19 +53,56 @@ def get_db_schema():
return "\n".join(lines)


def get_create_tables(schema):
pattern = r"""
CREATE TABLE (?P<table_name>\S+?) \(
(?P<columns_and_constraints>\S+?)
\)
""".strip()
return list(re.finditer(pattern, schema, flags=re.DOTALL))


def is_schema_changed(new, old):
tables_new = get_create_tables(new)
tables_old = get_create_tables(old)

if len(tables_new) != len(tables_old):
return False

for table_new, table_old in zip(tables_new, tables_old):
if table_new.group("table_name") != table_old.group("table_name"):
return False

cols_new = table_new.group("columns_and_constraints").splitlines()
cols_old = table_old.group("columns_and_constraints").splitlines()
# Check whether the new and old schemas have the same columns and constraints
if sorted(cols_new) != sorted(cols_old):
return False

return True


def write_file(s, path):
with open(path, "w") as f:
f.write(s)


def main():
assert _TRACKING_URI_ENV_VAR in os.environ
print("Tracking URI:", os.environ.get(_TRACKING_URI_ENV_VAR))

args = parse_args()
run_logging_operations()
schema = get_db_schema()
title = "Schema"
print("=" * 10, title, "=" * 10)
print(schema)
print("=" * (20 + 2 + len(title)))
os.makedirs(os.path.dirname(args.schema_output), exist_ok=True)
with open(args.schema_output, "w") as f:
f.write(schema)
schema_output = args.schema_output
os.makedirs(os.path.dirname(schema_output), exist_ok=True)
if os.path.exists(schema_output):
with open(schema_output) as f:
existing_schema = f.read()
if not is_schema_changed(schema, existing_schema):
write_file(schema, schema_output)
else:
write_file(schema, schema_output)


if __name__ == "__main__":
Expand Down
130 changes: 130 additions & 0 deletions tests/db/schemas/mssql.sql
@@ -0,0 +1,130 @@

CREATE TABLE alembic_version (
version_num VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num)
)


CREATE TABLE experiments (
experiment_id INTEGER GENERATED BY DEFAULT AS IDENTITY (INCREMENT BY 1 START WITH 1),
name VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
artifact_location VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS",
lifecycle_stage VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS",
CONSTRAINT experiment_pk PRIMARY KEY (experiment_id)
)


CREATE TABLE registered_models (
name VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
CONSTRAINT registered_model_pk PRIMARY KEY (name)
)


CREATE TABLE experiment_tags (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
experiment_id INTEGER NOT NULL,
CONSTRAINT experiment_tag_pk PRIMARY KEY (key, experiment_id),
CONSTRAINT "FK__experimen__exper__3C69FB99" FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id)
)


CREATE TABLE model_versions (
name VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
version INTEGER NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
user_id VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS",
current_stage VARCHAR(20) COLLATE "SQL_Latin1_General_CP1_CI_AS",
source VARCHAR(500) COLLATE "SQL_Latin1_General_CP1_CI_AS",
run_id VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS",
status VARCHAR(20) COLLATE "SQL_Latin1_General_CP1_CI_AS",
status_message VARCHAR(500) COLLATE "SQL_Latin1_General_CP1_CI_AS",
run_link VARCHAR(500) COLLATE "SQL_Latin1_General_CP1_CI_AS",
CONSTRAINT model_version_pk PRIMARY KEY (name, version),
CONSTRAINT "FK__model_vers__name__44FF419A" FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE
)


CREATE TABLE registered_model_tags (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
name VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
CONSTRAINT registered_model_tag_pk PRIMARY KEY (key, name),
CONSTRAINT "FK__registered__name__48CFD27E" FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE
)


CREATE TABLE runs (
run_uuid VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
name VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS",
source_type VARCHAR(20) COLLATE "SQL_Latin1_General_CP1_CI_AS",
source_name VARCHAR(500) COLLATE "SQL_Latin1_General_CP1_CI_AS",
entry_point_name VARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS",
user_id VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS",
status VARCHAR(9) COLLATE "SQL_Latin1_General_CP1_CI_AS",
start_time BIGINT,
end_time BIGINT,
source_version VARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS",
lifecycle_stage VARCHAR(20) COLLATE "SQL_Latin1_General_CP1_CI_AS",
artifact_uri VARCHAR(200) COLLATE "SQL_Latin1_General_CP1_CI_AS",
experiment_id INTEGER,
CONSTRAINT run_pk PRIMARY KEY (run_uuid),
CONSTRAINT "FK__runs__experiment__2B3F6F97" FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id)
)


CREATE TABLE latest_metrics (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value FLOAT NOT NULL,
timestamp BIGINT,
step BIGINT NOT NULL,
is_nan BIT NOT NULL,
run_uuid VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
CONSTRAINT latest_metric_pk PRIMARY KEY (key, run_uuid),
CONSTRAINT "FK__latest_me__run_u__3F466844" FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)


CREATE TABLE metrics (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value FLOAT NOT NULL,
timestamp BIGINT NOT NULL,
run_uuid VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
step BIGINT DEFAULT ('0') NOT NULL,
is_nan BIT DEFAULT ('0') NOT NULL,
CONSTRAINT metric_pk PRIMARY KEY (key, timestamp, step, run_uuid, value, is_nan),
CONSTRAINT "FK__metrics__run_uui__30F848ED" FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)


CREATE TABLE model_version_tags (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
name VARCHAR(256) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
version INTEGER NOT NULL,
CONSTRAINT model_version_tag_pk PRIMARY KEY (key, name, version),
CONSTRAINT "FK__model_version_ta__4BAC3F29" FOREIGN KEY(name, version) REFERENCES model_versions (name, version) ON UPDATE CASCADE
)


CREATE TABLE params (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
run_uuid VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
CONSTRAINT param_pk PRIMARY KEY (key, run_uuid),
CONSTRAINT "FK__params__run_uuid__33D4B598" FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)


CREATE TABLE tags (
key VARCHAR(250) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
value VARCHAR(5000) COLLATE "SQL_Latin1_General_CP1_CI_AS",
run_uuid VARCHAR(32) COLLATE "SQL_Latin1_General_CP1_CI_AS" NOT NULL,
CONSTRAINT tag_pk PRIMARY KEY (key, run_uuid),
CONSTRAINT "FK__tags__run_uuid__2E1BDC42" FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)
137 changes: 137 additions & 0 deletions tests/db/schemas/mysql.sql
@@ -0,0 +1,137 @@

CREATE TABLE alembic_version (
version_num VARCHAR(32) NOT NULL,
PRIMARY KEY (version_num)
)


CREATE TABLE experiments (
experiment_id INTEGER NOT NULL,
name VARCHAR(256) NOT NULL,
artifact_location VARCHAR(256),
lifecycle_stage VARCHAR(32),
PRIMARY KEY (experiment_id),
CONSTRAINT experiments_lifecycle_stage CHECK ((`lifecycle_stage` in (_utf8mb4'active',_utf8mb4'deleted')))
)


CREATE TABLE registered_models (
name VARCHAR(256) NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000),
PRIMARY KEY (name)
)


CREATE TABLE experiment_tags (
key VARCHAR(250) NOT NULL,
value VARCHAR(5000),
experiment_id INTEGER NOT NULL,
PRIMARY KEY (key, experiment_id),
CONSTRAINT experiment_tags_ibfk_1 FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id)
)


CREATE TABLE model_versions (
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000),
user_id VARCHAR(256),
current_stage VARCHAR(20),
source VARCHAR(500),
run_id VARCHAR(32),
status VARCHAR(20),
status_message VARCHAR(500),
run_link VARCHAR(500),
PRIMARY KEY (name, version),
CONSTRAINT model_versions_ibfk_1 FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE
)


CREATE TABLE registered_model_tags (
key VARCHAR(250) NOT NULL,
value VARCHAR(5000),
name VARCHAR(256) NOT NULL,
PRIMARY KEY (key, name),
CONSTRAINT registered_model_tags_ibfk_1 FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE
)


CREATE TABLE runs (
run_uuid VARCHAR(32) NOT NULL,
name VARCHAR(250),
source_type VARCHAR(20),
source_name VARCHAR(500),
entry_point_name VARCHAR(50),
user_id VARCHAR(256),
status VARCHAR(9),
start_time BIGINT,
end_time BIGINT,
source_version VARCHAR(50),
lifecycle_stage VARCHAR(20),
artifact_uri VARCHAR(200),
experiment_id INTEGER,
PRIMARY KEY (run_uuid),
CONSTRAINT runs_ibfk_1 FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id),
CONSTRAINT runs_chk_1 CHECK ((`status` in (_utf8mb4'SCHEDULED',_utf8mb4'FAILED',_utf8mb4'FINISHED',_utf8mb4'RUNNING',_utf8mb4'KILLED'))),
CONSTRAINT runs_lifecycle_stage CHECK ((`lifecycle_stage` in (_utf8mb4'active',_utf8mb4'deleted'))),
CONSTRAINT source_type CHECK ((`source_type` in (_utf8mb4'NOTEBOOK',_utf8mb4'JOB',_utf8mb4'LOCAL',_utf8mb4'UNKNOWN',_utf8mb4'PROJECT')))
)


CREATE TABLE latest_metrics (
key VARCHAR(250) NOT NULL,
value DOUBLE NOT NULL,
timestamp BIGINT,
step BIGINT NOT NULL,
is_nan TINYINT NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
PRIMARY KEY (key, run_uuid),
CONSTRAINT latest_metrics_ibfk_1 FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid),
CONSTRAINT latest_metrics_chk_1 CHECK ((`is_nan` in (0,1)))
)


CREATE TABLE metrics (
key VARCHAR(250) NOT NULL,
value DOUBLE NOT NULL,
timestamp BIGINT NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
step BIGINT DEFAULT '0' NOT NULL,
is_nan TINYINT DEFAULT '0' NOT NULL,
PRIMARY KEY (key, timestamp, step, run_uuid, value, is_nan),
CONSTRAINT metrics_ibfk_1 FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid),
CONSTRAINT metrics_chk_1 CHECK ((`is_nan` in (0,1))),
CONSTRAINT metrics_chk_2 CHECK ((`is_nan` in (0,1)))
)


CREATE TABLE model_version_tags (
key VARCHAR(250) NOT NULL,
value VARCHAR(5000),
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
PRIMARY KEY (key, name, version),
CONSTRAINT model_version_tags_ibfk_1 FOREIGN KEY(name, version) REFERENCES model_versions (name, version) ON UPDATE CASCADE
)


CREATE TABLE params (
key VARCHAR(250) NOT NULL,
value VARCHAR(250) NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
PRIMARY KEY (key, run_uuid),
CONSTRAINT params_ibfk_1 FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)


CREATE TABLE tags (
key VARCHAR(250) NOT NULL,
value VARCHAR(5000),
run_uuid VARCHAR(32) NOT NULL,
PRIMARY KEY (key, run_uuid),
CONSTRAINT tags_ibfk_1 FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
)

0 comments on commit e365e69

Please sign in to comment.