Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop using python semver parsing for mysql versions #11868

Merged
merged 4 commits into from Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,12 +16,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -195,7 +195,9 @@ def end_watch(self, run_id, handler):

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_INTERSECT_VERSION
)

@property
def event_watcher(self):
Expand Down
Expand Up @@ -18,12 +18,12 @@
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from dagster._utils import utc_datetime_from_timestamp
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -154,11 +154,15 @@ def supports_bucket_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BUCKET_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_BUCKET_VERSION
)

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_INTERSECT_VERSION
)

def add_daemon_heartbeat(self, daemon_heartbeat):
with self.connect() as conn:
Expand Down
Expand Up @@ -11,12 +11,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -117,7 +117,9 @@ def supports_batch_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BATCH_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(
MINIMUM_MYSQL_BATCH_VERSION
)

def get_server_version(self):
rows = self.execute("select version()")
Expand Down
21 changes: 21 additions & 0 deletions python_modules/libraries/dagster-mysql/dagster_mysql/utils.py
@@ -1,4 +1,5 @@
import logging
import re
import time
from contextlib import contextmanager
from urllib.parse import (
Expand Down Expand Up @@ -46,6 +47,26 @@ def get_conn_string(username, password, hostname, db_name, port="3306"):
)


def parse_mysql_version(version: str) -> tuple:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some test cases that use this directly too

"""Parse MySQL version into a tuple of ints.

Args:
version (str): MySQL version string.

Returns:
tuple: Tuple of ints representing the MySQL version.
"""
parsed = []
for part in re.split(r"\D+", version):
if len(part) == 0:
continue
try:
parsed.append(int(part))
except ValueError:
continue
return tuple(parsed)


def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2):
# Retry logic to recover from the case where two processes are creating
# tables at the same time using sqlalchemy
Expand Down
Expand Up @@ -83,3 +83,21 @@ def test_load_from_config(self, conn_string):
from_url_instance._run_storage.mysql_url
== from_env_instance._run_storage.mysql_url
)


def test_mysql_version(conn_string):
class FakeNonBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "5.7.38-log"

storage = FakeNonBucketing(conn_string)
assert not storage.supports_bucket_queries

class FakeBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "8.0.31-google"

storage = FakeBucketing(conn_string)
assert storage.supports_bucket_queries