Skip to content

Commit

Permalink
stop using python semver parsing for mysql versions (#11868)
Browse files Browse the repository at this point in the history
### Summary & Motivation
We were using `packaging.parse` to compare mysql server versions, which
broke when `0.23.0` was released with dropped support for
`LegacyVersion`.

This rolls our own custom version compares to find minimum supported
versions, which is tolerant of non semver compliant version strings.

The regex parses all the numeric values, and does tuple int comparisons.

Reported in #11794

Reference links:
pypa/packaging#407

### How I Tested These Changes
Added some test cases that override the server version string to
exercise the parsing logic.
  • Loading branch information
prha authored and gibsondan committed Jan 27, 2023
1 parent 02b9ad0 commit 57c18ba
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 7 deletions.
Expand Up @@ -16,12 +16,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -195,7 +195,9 @@ def end_watch(self, run_id, handler):

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_INTERSECT_VERSION
)

@property
def event_watcher(self):
Expand Down
Expand Up @@ -18,12 +18,12 @@
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from dagster._utils import utc_datetime_from_timestamp
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -154,11 +154,15 @@ def supports_bucket_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BUCKET_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_BUCKET_VERSION
)

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_INTERSECT_VERSION
)

def add_daemon_heartbeat(self, daemon_heartbeat):
with self.connect() as conn:
Expand Down
Expand Up @@ -11,12 +11,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -117,7 +117,9 @@ def supports_batch_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BATCH_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_BATCH_VERSION
)

def get_server_version(self):
rows = self.execute("select version()")
Expand Down
21 changes: 21 additions & 0 deletions python_modules/libraries/dagster-mysql/dagster_mysql/utils.py
@@ -1,4 +1,5 @@
import logging
import re
import time
from contextlib import contextmanager
from urllib.parse import (
Expand Down Expand Up @@ -46,6 +47,26 @@ def get_conn_string(username, password, hostname, db_name, port="3306"):
)


def parse_mysql_version(version: str) -> tuple:
"""Parse MySQL version into a tuple of ints.
Args:
version (str): MySQL version string.
Returns:
tuple: Tuple of ints representing the MySQL version.
"""
parsed = []
for part in re.split(r"\D+", version):
if len(part) == 0:
continue
try:
parsed.append(int(part))
except ValueError:
continue
return tuple(parsed)


def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2):
# Retry logic to recover from the case where two processes are creating
# tables at the same time using sqlalchemy
Expand Down
Expand Up @@ -4,6 +4,7 @@
import yaml
from dagster._core.test_utils import environ, instance_for_test
from dagster_mysql.run_storage import MySQLRunStorage
from dagster_mysql.utils import parse_mysql_version
from dagster_tests.storage_tests.utils.run_storage import TestRunStorage

TestRunStorage.__test__ = False
Expand Down Expand Up @@ -83,3 +84,23 @@ def test_load_from_config(self, conn_string):
from_url_instance._run_storage.mysql_url
== from_env_instance._run_storage.mysql_url
)


def test_mysql_version(conn_string):
class FakeNonBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "5.7.38-log"

storage = FakeNonBucketing(conn_string)
assert parse_mysql_version("5.7.38-log") == (5, 7, 38)
assert not storage.supports_bucket_queries

class FakeBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "8.0.31-google"

storage = FakeBucketing(conn_string)
assert parse_mysql_version("8.0.31-google") == (8, 0, 31)
assert storage.supports_bucket_queries

0 comments on commit 57c18ba

Please sign in to comment.