-
Notifications
You must be signed in to change notification settings - Fork 466
/
sql_adapter.rb
337 lines (297 loc) · 11.2 KB
/
sql_adapter.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
module Blazer
module Adapters
class SqlAdapter < BaseAdapter
attr_reader :connection_model
def initialize(data_source)
super
@connection_model =
Class.new(Blazer::Connection) do
def self.name
"Blazer::Connection::Adapter#{object_id}"
end
establish_connection(data_source.settings["url"]) if data_source.settings["url"]
end
end
def run_statement(statement, comment, bind_params = [])
columns = []
rows = []
error = nil
begin
result = nil
in_transaction do
set_timeout(data_source.timeout) if data_source.timeout
binds = bind_params.map { |v| ActiveRecord::Relation::QueryAttribute.new(nil, v, ActiveRecord::Type::Value.new) }
result = connection_model.connection.select_all("#{statement} /*#{comment}*/", nil, binds)
end
columns = result.columns
rows = result.rows
if result.column_types.any?
types = columns.map { |c| result.column_types[c] }
rows =
rows.map do |row|
row.map.with_index do |v, i|
v && (t = types[i]) ? t.send(:cast_value, v) : v
end
end
end
# fix for non-ASCII column names and charts
if adapter_name == "Trilogy"
columns.map! { |k| k.dup.force_encoding(Encoding::UTF_8) }
end
rescue => e
error = e.message.sub(/.+ERROR: /, "")
error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
error = Blazer::VARIABLE_MESSAGE if error.include?("syntax error at or near \"$") || error.include?("Incorrect syntax near '@") || error.include?("your MySQL server version for the right syntax to use near '?")
if error.include?("could not determine data type of parameter")
error += " - try adding casting to variables and make sure none are inside a string literal"
end
reconnect if error.include?("PG::ConnectionBad")
end
[columns, rows, error]
end
def tables
sql = add_schemas("SELECT table_schema, table_name FROM information_schema.tables")
result = data_source.run_statement(sql, refresh_cache: true)
if postgresql? || redshift? || snowflake?
result.rows.sort_by { |r| [r[0] == default_schema ? "" : r[0], r[1]] }.map do |row|
table =
if row[0] == default_schema
row[1]
else
"#{row[0]}.#{row[1]}"
end
table = table.downcase if snowflake?
{
table: table,
value: connection_model.connection.quote_table_name(table)
}
end
else
result.rows.map(&:second).sort
end
end
def schema
sql = add_schemas("SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns")
result = data_source.run_statement(sql)
result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }.sort_by { |t| [t[:schema] == default_schema ? "" : t[:schema], t[:table]] }
end
def preview_statement
if sqlserver?
"SELECT TOP (10) * FROM {table}"
else
"SELECT * FROM {table} LIMIT 10"
end
end
def reconnect
connection_model.establish_connection(settings["url"])
end
def cost(statement)
result = explain(statement)
if sqlserver?
result["TotalSubtreeCost"]
else
match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result)
match[1] if match
end
end
def explain(statement)
if postgresql? || redshift?
select_all("EXPLAIN #{statement}").rows.first.first
elsif sqlserver?
begin
execute("SET SHOWPLAN_ALL ON")
result = select_all(statement).each.first
ensure
execute("SET SHOWPLAN_ALL OFF")
end
result
end
rescue
nil
end
def cancel(run_id)
if postgresql?
select_all("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE ?", ["%,run_id:#{run_id}%"])
elsif redshift?
first_row = select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE ?", ["%,run_id:#{run_id}%"]).first
if first_row
select_all("CANCEL #{first_row["pid"].to_i}")
end
end
end
def cachable?(statement)
!%w[CREATE ALTER UPDATE INSERT DELETE].include?(statement.split.first.to_s.upcase)
end
def supports_cohort_analysis?
postgresql? || mysql?
end
# TODO treat date columns as already in time zone
def cohort_analysis_statement(statement, period:, days:)
raise "Cohort analysis not supported" unless supports_cohort_analysis?
cohort_column = statement =~ /\bcohort_time\b/ ? "cohort_time" : "conversion_time"
tzname = Blazer.time_zone.tzinfo.name
if mysql?
time_sql = "CONVERT_TZ(cohorts.cohort_time, '+00:00', ?)"
case period
when "day"
date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-%d') AS DATE)"
date_params = [tzname]
when "week"
date_sql = "CAST(DATE_FORMAT(#{time_sql} - INTERVAL ((5 + DAYOFWEEK(#{time_sql})) % 7) DAY, '%Y-%m-%d') AS DATE)"
date_params = [tzname, tzname]
else
date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-01') AS DATE)"
date_params = [tzname]
end
bucket_sql = "CAST(CEIL(TIMESTAMPDIFF(SECOND, cohorts.cohort_time, query.conversion_time) / ?) AS SIGNED)"
else
date_sql = "date_trunc(?, cohorts.cohort_time::timestamptz AT TIME ZONE ?)::date"
date_params = [period, tzname]
bucket_sql = "CEIL(EXTRACT(EPOCH FROM query.conversion_time - cohorts.cohort_time) / ?)::int"
end
# WITH not an optimization fence in Postgres 12+
statement = <<~SQL
WITH query AS (
{placeholder}
),
cohorts AS (
SELECT user_id, MIN(#{cohort_column}) AS cohort_time FROM query
WHERE user_id IS NOT NULL AND #{cohort_column} IS NOT NULL
GROUP BY 1
)
SELECT
#{date_sql} AS period,
0 AS bucket,
COUNT(DISTINCT cohorts.user_id)
FROM cohorts GROUP BY 1
UNION ALL
SELECT
#{date_sql} AS period,
#{bucket_sql} AS bucket,
COUNT(DISTINCT query.user_id)
FROM cohorts INNER JOIN query ON query.user_id = cohorts.user_id
WHERE query.conversion_time IS NOT NULL
AND query.conversion_time >= cohorts.cohort_time
#{cohort_column == "conversion_time" ? "AND query.conversion_time != cohorts.cohort_time" : ""}
GROUP BY 1, 2
SQL
params = [statement] + date_params + date_params + [days.to_i * 86400]
connection_model.send(:sanitize_sql_array, params)
end
def quoting
->(value) { connection_model.connection.quote(value) }
end
# Redshift adapter silently ignores binds
def parameter_binding
if postgresql?
:numeric
elsif sqlite? && prepared_statements?
# Active Record silently ignores binds with SQLite when prepared statements are disabled
:numeric
elsif mysql? && prepared_statements?
# Active Record silently ignores binds with MySQL when prepared statements are disabled
:positional
elsif sqlserver?
proc do |statement, variables|
variables.each_with_index do |(k, _), i|
statement = statement.gsub("{#{k}}", "@#{i} ")
end
[statement, variables.values]
end
end
end
protected
def select_all(statement, params = [])
statement = connection_model.send(:sanitize_sql_array, [statement] + params) if params.any?
connection_model.connection.select_all(statement)
end
# seperate from select_all to prevent mysql error
def execute(statement)
connection_model.connection.execute(statement)
end
def postgresql?
["PostgreSQL", "PostGIS"].include?(adapter_name)
end
def redshift?
["Redshift"].include?(adapter_name)
end
def mysql?
["MySQL", "Mysql2", "Mysql2Spatial", "Trilogy"].include?(adapter_name)
end
def sqlite?
["SQLite"].include?(adapter_name)
end
def sqlserver?
["SQLServer", "tinytds", "mssql"].include?(adapter_name)
end
def snowflake?
data_source.adapter == "snowflake"
end
def adapter_name
# prevent bad data source from taking down queries/new
connection_model.connection.adapter_name rescue nil
end
def default_schema
@default_schema ||= begin
if postgresql? || redshift?
"public"
elsif sqlserver?
"dbo"
elsif connection_model.respond_to?(:connection_db_config)
connection_model.connection_db_config.database
else
connection_model.connection_config[:database]
end
end
end
def add_schemas(query)
if settings["schemas"]
where = "table_schema IN (?)"
schemas = settings["schemas"]
elsif mysql?
where = "table_schema IN (?)"
schemas = [default_schema]
else
where = "table_schema NOT IN (?)"
schemas = ["information_schema"]
schemas.map!(&:upcase) if snowflake?
schemas << "pg_catalog" if postgresql? || redshift?
end
connection_model.send(:sanitize_sql_array, ["#{query} WHERE #{where}", schemas])
end
def set_timeout(timeout)
if postgresql? || redshift?
execute("SET #{use_transaction? ? "LOCAL " : ""}statement_timeout = #{timeout.to_i * 1000}")
elsif mysql?
# use send as this method is private in Rails 4.2
mariadb = connection_model.connection.send(:mariadb?) rescue false
if mariadb
execute("SET max_statement_time = #{timeout.to_i * 1000}")
else
execute("SET max_execution_time = #{timeout.to_i * 1000}")
end
else
raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter"
end
end
def use_transaction?
settings.key?("use_transaction") ? settings["use_transaction"] : true
end
def in_transaction
connection_model.connection_pool.with_connection do
if use_transaction?
connection_model.transaction do
yield
raise ActiveRecord::Rollback
end
else
yield
end
end
end
def prepared_statements?
connection_model.connection.prepared_statements
end
end
end
end