From 57c18ba727c062d8c45ac6b5b140d3dc21378273 Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Fri, 27 Jan 2023 08:33:59 -0800 Subject: [PATCH] stop using python semver parsing for mysql versions (#11868) ### Summary & Motivation We were using `packaging.parse` to compare mysql server versions, which broke when `0.23.0` was released with dropped support for `LegacyVersion`. This rolls our own custom version compares to find minimum supported versions, which is tolerant of non semver compliant version strings. The regex parses all the numeric values, and does tuple int comparisons. Reported in https://github.com/dagster-io/dagster/issues/11794 Reference links: https://github.com/pypa/packaging/pull/407 ### How I Tested These Changes Added some test cases that override the server version string to exercise the parsing logic. --- .../dagster_mysql/event_log/event_log.py | 6 ++++-- .../dagster_mysql/run_storage/run_storage.py | 10 ++++++--- .../schedule_storage/schedule_storage.py | 6 ++++-- .../dagster-mysql/dagster_mysql/utils.py | 21 +++++++++++++++++++ .../dagster_mysql_tests/test_run_storage.py | 21 +++++++++++++++++++ 5 files changed, 57 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py index 8bb0da9a55d4..0fba7e467c1f 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py @@ -16,12 +16,12 @@ stamp_alembic_rev, ) from dagster._serdes import ConfigurableClass, ConfigurableClassData -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -195,7 +195,9 @@ def end_watch(self, run_id, handler): @property def supports_intersect(self): - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_INTERSECT_VERSION + ) @property def event_watcher(self): diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py index 553f2565c0ed..4307dec140e9 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py @@ -18,12 +18,12 @@ ) from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple from dagster._utils import utc_datetime_from_timestamp -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -154,11 +154,15 @@ def supports_bucket_queries(self): if not self._mysql_version: return False - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BUCKET_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_BUCKET_VERSION + ) @property def supports_intersect(self): - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_INTERSECT_VERSION + ) def add_daemon_heartbeat(self, daemon_heartbeat): with self.connect() as conn: diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py index 127303681704..4149b0a730d8 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/schedule_storage/schedule_storage.py @@ -11,12 +11,12 @@ stamp_alembic_rev, ) from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple -from packaging.version import parse from ..utils import ( create_mysql_connection, mysql_alembic_config, mysql_url_from_config, + parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) @@ -117,7 +117,9 @@ def supports_batch_queries(self): if not self._mysql_version: return False - return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BATCH_VERSION) + return parse_mysql_version(self._mysql_version) >= parse_mysql_version( + MINIMUM_MYSQL_BATCH_VERSION + ) def get_server_version(self): rows = self.execute("select version()") diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py b/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py index 1f8852b5a5b4..5a025b1eb61f 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/utils.py @@ -1,4 +1,5 @@ import logging +import re import time from contextlib import contextmanager from urllib.parse import ( @@ -46,6 +47,26 @@ def get_conn_string(username, password, hostname, db_name, port="3306"): ) +def parse_mysql_version(version: str) -> tuple: + """Parse MySQL version into a tuple of ints. + + Args: + version (str): MySQL version string. + + Returns: + tuple: Tuple of ints representing the MySQL version. + """ + parsed = [] + for part in re.split(r"\D+", version): + if len(part) == 0: + continue + try: + parsed.append(int(part)) + except ValueError: + continue + return tuple(parsed) + + def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2): # Retry logic to recover from the case where two processes are creating # tables at the same time using sqlalchemy diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py index 05dffe5dd810..5867a1cbee02 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py @@ -4,6 +4,7 @@ import yaml from dagster._core.test_utils import environ, instance_for_test from dagster_mysql.run_storage import MySQLRunStorage +from dagster_mysql.utils import parse_mysql_version from dagster_tests.storage_tests.utils.run_storage import TestRunStorage TestRunStorage.__test__ = False @@ -83,3 +84,23 @@ def test_load_from_config(self, conn_string): from_url_instance._run_storage.mysql_url == from_env_instance._run_storage.mysql_url ) + + +def test_mysql_version(conn_string): + class FakeNonBucketing(MySQLRunStorage): + def get_server_version(self): + # override the server version to make sure the parsing works + return "5.7.38-log" + + storage = FakeNonBucketing(conn_string) + assert parse_mysql_version("5.7.38-log") == (5, 7, 38) + assert not storage.supports_bucket_queries + + class FakeBucketing(MySQLRunStorage): + def get_server_version(self): + # override the server version to make sure the parsing works + return "8.0.31-google" + + storage = FakeBucketing(conn_string) + assert parse_mysql_version("8.0.31-google") == (8, 0, 31) + assert storage.supports_bucket_queries