diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py index 8bb0da9a55d4..0fba7e467c1f 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py @@ -16,12 +16,12 @@ stamp_alembic_rev, ) from dagster._serdes import ConfigurableClass, ConfigurableClassData -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -195,7 +195,9 @@ def end_watch(self, run_id, handler): @property def supports_intersect(self): - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_INTERSECT_VERSION + ) @property def event_watcher(self): diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py index 553f2565c0ed..4307dec140e9 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py @@ -18,12 +18,12 @@ ) from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple from dagster._utils import utc_datetime_from_timestamp -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -154,11 +154,15 @@ def supports_bucket_queries(self): if not self._mysql_version: return False - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BUCKET_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_BUCKET_VERSION + ) @property def supports_intersect(self): - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_INTERSECT_VERSION + ) def add_daemon_heartbeat(self, daemon_heartbeat): with self.connect() as conn: diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py index 127303681704..4149b0a730d8 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py @@ -11,12 +11,12 @@ stamp_alembic_rev, ) from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -117,7 +117,9 @@ def supports_batch_queries(self): if not self._mysql_version: return False - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BATCH_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_BATCH_VERSION + ) def get_server_version(self): rows = self.execute("select version()") diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py b/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py index 1f8852b5a5b4..5a025b1eb61f 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py @@ -1,4 +1,5 @@ import logging +import re import time from contextlib import contextmanager from urllib.parse import ( @@ -46,6 +47,26 @@ def get_conn_string(username, password, hostname, db_name, port="3306"): ) +def parse_mysql_version(version: str) -> tuple: + """Parse MySQL version into a tuple of ints. + + Args: + version (str): MySQL version string. + + Returns: + tuple: Tuple of ints representing the MySQL version. + """ + parsed = [] + for part in re.split(r"\D+", version): + if len(part) == 0: + continue + try: + parsed.append(int(part)) + except ValueError: + continue + return tuple(parsed) + + def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2): # Retry logic to recover from the case where two processes are creating # tables at the same time using sqlalchemy diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py index 05dffe5dd810..5867a1cbee02 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py @@ -4,6 +4,7 @@ import yaml from dagster._core.test_utils import environ, instance_for_test from dagster_mysql.run_storage import MySQLRunStorage +from dagster_mysql.utils import parse_mysql_version from dagster_tests.storage_tests.utils.run_storage import TestRunStorage TestRunStorage.__test__ = False @@ -83,3 +84,23 @@ def test_load_from_config(self, conn_string): from_url_instance._run_storage.mysql_url == from_env_instance._run_storage.mysql_url ) + + +def test_mysql_version(conn_string): + class FakeNonBucketing(MySQLRunStorage): + def get_server_version(self): + # override the server version to make sure the parsing works + return "5.7.38-log" + + storage = FakeNonBucketing(conn_string) + assert parse_mysql_version("5.7.38-log") == (5, 7, 38) + assert not storage.supports_bucket_queries + + class FakeBucketing(MySQLRunStorage): + def get_server_version(self): + # override the server version to make sure the parsing works + return "8.0.31-google" + + storage = FakeBucketing(conn_string) + assert parse_mysql_version("8.0.31-google") == (8, 0, 31) + assert storage.supports_bucket_queries