/
utils.py
170 lines (138 loc) · 4.84 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import logging
import re
import time
from contextlib import contextmanager
from urllib.parse import (
quote_plus as urlquote,
urlparse,
)
import mysql.connector as mysql
import sqlalchemy as db
from dagster import _check as check
from dagster._core.storage.sql import get_alembic_config
class DagsterMySQLException(Exception):
pass
def get_conn(conn_string):
parsed = urlparse(conn_string)
conn = mysql.connect(
user=parsed.username,
passwd=parsed.password,
host=parsed.hostname,
database=parsed.path[1:], # Skip first char, URL parser retains leading "/"
port=parsed.port,
)
# https://github.com/dagster-io/dagster/issues/3735
return conn
def mysql_url_from_config(config_value):
if config_value.get("mysql_url"):
return config_value["mysql_url"]
return get_conn_string(**config_value["mysql_db"])
def get_conn_string(username, password, hostname, db_name, port="3306"):
return "mysql+mysqlconnector://{username}:{password}@{hostname}:{port}/{db_name}".format(
username=username,
password=urlquote(password),
hostname=hostname,
db_name=db_name,
port=port,
)
def parse_mysql_version(version: str) -> tuple:
"""Parse MySQL version into a tuple of ints.
Args:
version (str): MySQL version string.
Returns:
tuple: Tuple of ints representing the MySQL version.
"""
parsed = []
for part in re.split(r"\D+", version):
if len(part) == 0:
continue
try:
parsed.append(int(part))
except ValueError:
continue
return tuple(parsed)
def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2):
# Retry logic to recover from the case where two processes are creating
# tables at the same time using sqlalchemy
check.callable_param(fn, "fn")
check.int_param(retry_limit, "retry_limit")
check.numeric_param(retry_wait, "retry_wait")
while True:
try:
return fn()
except (
mysql.ProgrammingError,
mysql.IntegrityError,
db.exc.ProgrammingError,
db.exc.IntegrityError,
) as exc:
if (
isinstance(exc, db.exc.ProgrammingError)
and exc.orig
and exc.orig.errno == mysql.errorcode.ER_TABLE_EXISTS_ERROR
) or (
isinstance(exc, mysql.ProgrammingError)
and exc.errno == mysql.errorcode.ER_TABLE_EXISTS_ERROR
):
raise
logging.warning("Retrying failed database creation")
if retry_limit == 0:
raise DagsterMySQLException("too many retries for DB creation") from exc
time.sleep(retry_wait)
retry_limit -= 1
def retry_mysql_connection_fn(fn, retry_limit=5, retry_wait=0.2):
"""Reusable retry logic for any MySQL connection functions that may fail.
Intended to be used anywhere we connect to MySQL, to gracefully handle transient connection
issues.
"""
check.callable_param(fn, "fn")
check.int_param(retry_limit, "retry_limit")
check.numeric_param(retry_wait, "retry_wait")
while True:
try:
return fn()
except (
mysql.DatabaseError,
mysql.OperationalError,
db.exc.DatabaseError,
db.exc.OperationalError,
mysql.errors.InterfaceError,
) as exc:
logging.warning("Retrying failed database connection")
if retry_limit == 0:
raise DagsterMySQLException("too many retries for DB connection") from exc
time.sleep(retry_wait)
retry_limit -= 1
def wait_for_connection(conn_string, retry_limit=5, retry_wait=0.2):
parsed = urlparse(conn_string)
retry_mysql_connection_fn(
lambda: mysql.connect(
user=parsed.username,
passwd=parsed.password,
host=parsed.hostname,
database=parsed.path[1:], # Skip first char, URL parser retains leading "/"
port=parsed.port,
),
retry_limit=retry_limit,
retry_wait=retry_wait,
)
return True
def mysql_alembic_config(dunder_file):
return get_alembic_config(dunder_file, config_path="../alembic/alembic.ini")
@contextmanager
def create_mysql_connection(engine, dunder_file, storage_type_desc=None):
check.inst_param(engine, "engine", db.engine.Engine)
check.str_param(dunder_file, "dunder_file")
check.opt_str_param(storage_type_desc, "storage_type_desc", "")
if storage_type_desc:
storage_type_desc += " "
else:
storage_type_desc = ""
conn = None
try:
# Retry connection to gracefully handle transient connection issues
conn = retry_mysql_connection_fn(engine.connect)
yield conn
finally:
if conn:
conn.close()