forked from googleapis/google-cloud-ruby
-
Notifications
You must be signed in to change notification settings - Fork 2
/
job.rb
463 lines (426 loc) · 13.5 KB
/
job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "google/cloud/errors"
require "google/cloud/bigquery/service"
require "google/cloud/bigquery/job/list"
require "google/cloud/bigquery/convert"
require "json"
module Google
module Cloud
module Bigquery
##
# # Job
#
# Represents a generic Job that may be performed on a {Table}.
#
# The subclasses of Job represent the specific BigQuery job types:
# {CopyJob}, {ExtractJob}, {LoadJob}, and {QueryJob}.
#
# A job instance is created when you call {Project#query_job},
# {Dataset#query_job}, {Table#copy_job}, {Table#extract_job},
# {Table#load_job}.
#
# @see https://cloud.google.com/bigquery/docs/managing-jobs Running and
# Managing Jobs
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# job = bigquery.query_job "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job.wait_until_done!
#
# if job.failed?
# puts job.error
# else
# puts job.data.first
# end
#
class Job
##
# @private The Service object.
attr_accessor :service
##
# @private The Google API Client object.
attr_accessor :gapi
##
# @private Create an empty Job object.
def initialize
@service = nil
@gapi = {}
end
##
# The ID of the job.
#
# @return [String] The ID must contain only letters (a-z, A-Z), numbers
# (0-9), underscores (_), or dashes (-). The maximum length is 1,024
# characters.
#
def job_id
@gapi.job_reference.job_id
end
##
# The ID of the project containing the job.
#
# @return [String] The project ID.
#
def project_id
@gapi.job_reference.project_id
end
##
# The geographic location where the job runs.
#
# @return [String] A geographic location, such as "US", "EU" or
# "asia-northeast1".
#
# @!group Attributes
def location
@gapi.job_reference.location
end
##
# The email address of the user who ran the job.
#
# @return [String] The email address.
#
def user_email
@gapi.user_email
end
##
# The current state of the job. A `DONE` state does not mean that the
# job completed successfully. Use {#failed?} to discover if an error
# occurred or if the job was successful.
#
# @return [String] The state code. The possible values are `PENDING`,
# `RUNNING`, and `DONE`.
#
def state
return nil if @gapi.status.nil?
@gapi.status.state
end
##
# Checks if the job's state is `RUNNING`.
#
# @return [Boolean] `true` when `RUNNING`, `false` otherwise.
#
def running?
return false if state.nil?
"running".casecmp(state).zero?
end
##
# Checks if the job's state is `PENDING`.
#
# @return [Boolean] `true` when `PENDING`, `false` otherwise.
#
def pending?
return false if state.nil?
"pending".casecmp(state).zero?
end
##
# Checks if the job's state is `DONE`. When `true`, the job has stopped
# running. However, a `DONE` state does not mean that the job completed
# successfully. Use {#failed?} to detect if an error occurred or if the
# job was successful.
#
# @return [Boolean] `true` when `DONE`, `false` otherwise.
#
def done?
return false if state.nil?
"done".casecmp(state).zero?
end
##
# Checks if an error is present. Use {#error} to access the error
# object.
#
# @return [Boolean] `true` when there is an error, `false` otherwise.
#
def failed?
!error.nil?
end
##
# The time when the job was created.
#
# @return [Time, nil] The creation time from the job statistics.
#
def created_at
Convert.millis_to_time @gapi.statistics.creation_time
end
##
# The time when the job was started.
# This field is present after the job's state changes from `PENDING`
# to either `RUNNING` or `DONE`.
#
# @return [Time, nil] The start time from the job statistics.
#
def started_at
Convert.millis_to_time @gapi.statistics.start_time
end
##
# The time when the job ended.
# This field is present when the job's state is `DONE`.
#
# @return [Time, nil] The end time from the job statistics.
#
def ended_at
Convert.millis_to_time @gapi.statistics.end_time
end
##
# The configuration for the job. Returns a hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
def configuration
JSON.parse @gapi.configuration.to_json
end
alias config configuration
##
# The statistics for the job. Returns a hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @return [Hash] The job statistics.
#
def statistics
JSON.parse @gapi.statistics.to_json
end
alias stats statistics
##
# The job's status. Returns a hash. The values contained in the hash are
# also exposed by {#state}, {#error}, and {#errors}.
#
# @return [Hash] The job status.
#
def status
JSON.parse @gapi.status.to_json
end
##
# The last error for the job, if any errors have occurred. Returns a
# hash.
#
# @see https://cloud.google.com/bigquery/docs/reference/v2/jobs Jobs API
# reference
#
# @return [Hash, nil] Returns a hash containing `reason` and `message`
# keys:
#
# {
# "reason"=>"notFound",
# "message"=>"Not found: Table bigquery-public-data:samples.BAD_ID"
# }
#
def error
status["errorResult"]
end
##
# The errors for the job, if any errors have occurred. Returns an array
# of hash objects. See {#error}.
#
# @return [Array<Hash>, nil] Returns an array of hashes containing
# `reason` and `message` keys:
#
# {
# "reason"=>"notFound",
# "message"=>"Not found: Table bigquery-public-data:samples.BAD_ID"
# }
#
def errors
Array status["errors"]
end
##
# A hash of user-provided labels associated with this job. Labels can be
# provided when the job is created, and used to organize and group jobs.
#
# The returned hash is frozen and changes are not allowed. Use
# {CopyJob::Updater#labels=} or {ExtractJob::Updater#labels=} or
# {LoadJob::Updater#labels=} or {QueryJob::Updater#labels=} to replace
# the entire hash.
#
# @return [Hash] The job labels.
#
# @!group Attributes
#
def labels
m = @gapi.configuration.labels
m = m.to_h if m.respond_to? :to_h
m.dup.freeze
end
##
# Cancels the job.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.cancel
#
def cancel
ensure_service!
resp = service.cancel_job job_id, location: location
@gapi = resp.job
true
end
##
# Created a new job with the current configuration.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.wait_until_done!
# job.rerun!
#
def rerun!
ensure_service!
gapi = service.insert_job @gapi.configuration, location: location
Job.from_gapi gapi, service
end
##
# Reloads the job with current data from the BigQuery service.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
#
# query = "SELECT COUNT(word) as count FROM " \
# "`bigquery-public-data.samples.shakespeare`"
#
# job = bigquery.query_job query
#
# job.done?
# job.reload!
# job.done? #=> true
#
def reload!
ensure_service!
gapi = service.get_job job_id, location: location
@gapi = gapi
end
alias refresh! reload!
##
# Refreshes the job until the job is `DONE`. The delay between refreshes
# starts at 5 seconds and increases exponentially to a maximum of 60
# seconds.
#
# @example
# require "google/cloud/bigquery"
#
# bigquery = Google::Cloud::Bigquery.new
# dataset = bigquery.dataset "my_dataset"
# table = dataset.table "my_table"
#
# extract_job = table.extract_job "gs://my-bucket/file-name.json",
# format: "json"
# extract_job.wait_until_done!
# extract_job.done? #=> true
#
def wait_until_done!
backoff = lambda do |retries|
delay = [retries**2 + 5, 60].min # Maximum delay is 60
sleep delay
end
retries = 0
until done?
backoff.call retries
retries += 1
reload!
end
end
##
# @private New Job from a Google API Client object.
def self.from_gapi gapi, conn
klass = klass_for gapi
klass.new.tap do |f|
f.gapi = gapi
f.service = conn
end
end
##
# @private New Google::Apis::Error with job failure details
def gapi_error
return nil unless failed?
error_status_code = status_code_for_reason error["reason"]
error_body = error
error_body["errors"] = errors
Google::Apis::Error.new error["message"],
status_code: error_status_code,
body: error_body
end
##
# @private
# Get the subclass for a job type
def self.klass_for gapi
if gapi.configuration.copy
CopyJob
elsif gapi.configuration.extract
ExtractJob
elsif gapi.configuration.load
LoadJob
elsif gapi.configuration.query
QueryJob
else
Job
end
end
protected
##
# Raise an error unless an active connection is available.
def ensure_service!
raise "Must have active connection" unless service
end
def retrieve_table project_id, dataset_id, table_id
ensure_service!
gapi = service.get_project_table project_id, dataset_id, table_id
Table.from_gapi gapi, service
rescue Google::Cloud::NotFoundError
nil
end
def status_code_for_reason reason
codes = { "accessDenied" => 403, "backendError" => 500,
"billingNotEnabled" => 403,
"billingTierLimitExceeded" => 400, "blocked" => 403,
"duplicate" => 409, "internalError" => 500,
"invalid" => 400, "invalidQuery" => 400, "notFound" => 404,
"notImplemented" => 501, "quotaExceeded" => 403,
"rateLimitExceeded" => 403, "resourceInUse" => 400,
"resourcesExceeded" => 400, "responseTooLarge" => 403,
"tableUnavailable" => 400 }
codes[reason] || 0
end
end
end
end
end
# We need Job to be defined before loading these.
require "google/cloud/bigquery/copy_job"
require "google/cloud/bigquery/extract_job"
require "google/cloud/bigquery/load_job"
require "google/cloud/bigquery/query_job"