forked from googleapis/google-cloud-ruby
-
Notifications
You must be signed in to change notification settings - Fork 2
/
dataset_test.rb
563 lines (491 loc) · 22.5 KB
/
dataset_test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "bigquery_helper"
describe Google::Cloud::Bigquery::Dataset, :bigquery do
let(:publicdata_query) { "SELECT url FROM `bigquery-public-data.samples.github_nested` LIMIT 100" }
let(:dataset_id) { "#{prefix}_dataset" }
let(:dataset) do
d = bigquery.dataset dataset_id
if d.nil?
d = bigquery.create_dataset dataset_id
end
d
end
let(:table_id) { "dataset_table" }
let(:table) do
t = dataset.table table_id
if t.nil?
t = dataset.create_table table_id
end
t
end
let(:table_with_schema_id) { "dataset_table_with_schema" }
let(:table_with_schema) do
t = dataset.table table_with_schema_id
if t.nil?
t = dataset.create_table table_with_schema_id do |schema|
schema.integer "id", description: "id description", mode: :required
schema.string "breed", description: "breed description", mode: :required
schema.string "name", description: "name description", mode: :required
schema.timestamp "dob", description: "dob description", mode: :required
schema.numeric "my_numeric", mode: :nullable
schema.bignumeric "my_bignumeric", mode: :nullable
end
end
t
end
let(:table_avro_id) { "dataset_table_avro" }
let(:table_avro) { dataset.table table_avro_id }
let(:table_orc_id) { "dataset_table_orc" }
let(:table_orc) { dataset.table table_orc_id }
let(:local_orc_file) { "acceptance/data/us-states.orc" }
let(:table_parquet_id) { "dataset_table_parquet" }
let(:table_parquet) { dataset.table table_parquet_id }
let(:local_parquet_file) { "acceptance/data/us-states.parquet" }
let(:rows) do
[
{ name: "silvano", breed: "the cat kind", id: 4, dob: Time.now.utc },
{ name: "ryan", breed: "golden retriever?", id: 5, dob: Time.now.utc },
{ name: "stephen", breed: "idkanycatbreeds", id: 6, dob: Time.now.utc }
]
end
let(:invalid_rows) do
[
{ name: "silvano", breed: "the cat kind", id: 4, dob: Time.now.utc },
{ name: nil, breed: "golden retriever?", id: 5, dob: Time.now.utc },
{ name: "stephen", breed: "idkanycatbreeds", id: 6, dob: Time.now.utc }
]
end
let(:insert_ids) { Array.new(3) {SecureRandom.uuid} }
let(:view_id) { "dataset_view" }
let(:view) do
t = dataset.table view_id
if t.nil?
t = dataset.create_view view_id, publicdata_query
end
t
end
let(:local_file) { "acceptance/data/kitten-test-data.json" }
let(:schema_update_options) { ["ALLOW_FIELD_ADDITION", "ALLOW_FIELD_RELAXATION"] }
let(:clustering_fields) { ["breed", "name"] }
let(:string_numeric) { "0.123456789" }
let(:string_bignumeric) { "0.12345678901234567890123456789012345678" }
before do
table
view
end
it "has the attributes of a dataset" do
fresh = bigquery.dataset dataset_id
_(fresh).must_be_kind_of Google::Cloud::Bigquery::Dataset
_(fresh.project_id).must_equal bigquery.project
_(fresh.dataset_id).must_equal dataset.dataset_id
_(fresh.etag).wont_be :nil?
_(fresh.api_url).wont_be :nil?
_(fresh.created_at).must_be_kind_of Time
_(fresh.modified_at).must_be_kind_of Time
_(fresh.dataset_ref).must_be_kind_of Hash
_(fresh.dataset_ref[:project_id]).must_equal bigquery.project
_(fresh.dataset_ref[:dataset_id]).must_equal dataset.dataset_id
# fresh.location.must_equal "US" TODO why nil? Set in dataset
end
describe "#delete" do
let(:dataset_delete_id) { "#{prefix}_dataset_for_delete" }
let(:dataset_delete) do
d = bigquery.dataset dataset_delete_id
if d.nil?
d = bigquery.create_dataset dataset_delete_id
end
d
end
it "deletes itself and knows it no longer exists" do
_(dataset_delete.exists?).must_equal true
dataset_delete.tables.all(&:delete)
_(dataset_delete.delete).must_equal true
_(dataset_delete.exists?).must_equal false
_(dataset_delete.exists?(force: true)).must_equal false
end
end
it "should set & get metadata" do
new_name = "New name"
new_desc = "New description!"
new_default_expiration = 12345678
new_labels = { "bar" => "baz" }
dataset.name = new_name
dataset.description = new_desc
dataset.default_expiration = new_default_expiration
dataset.labels = new_labels
fresh = bigquery.dataset dataset.dataset_id
_(fresh).wont_be :nil?
_(fresh).must_be_kind_of Google::Cloud::Bigquery::Dataset
_(fresh.dataset_id).must_equal dataset.dataset_id
_(fresh.name).must_equal new_name
_(fresh.description).must_equal new_desc
_(fresh.default_expiration).must_equal new_default_expiration
_(fresh.labels).must_equal new_labels
dataset.default_expiration = nil
end
it "should fail to set metadata with stale etag" do
fresh = bigquery.dataset dataset.dataset_id
_(fresh.etag).wont_be :nil?
stale = bigquery.dataset dataset_id
_(stale.etag).wont_be :nil?
_(stale.etag).must_equal fresh.etag
# Modify on the server, which will change the etag
fresh.description = "Description 1"
_(stale.etag).wont_equal fresh.etag
err = expect { stale.description = "Description 2" }.must_raise Google::Cloud::FailedPreconditionError
_(err.message).must_equal "failedPrecondition: Precondition check failed."
end
it "create dataset returns valid etag equal to get dataset" do
fresh_dataset_id = "#{prefix}_#{rand 100}_unique"
fresh = bigquery.create_dataset fresh_dataset_id
_(fresh.etag).wont_be :nil?
stale = bigquery.dataset fresh_dataset_id
_(stale.etag).wont_be :nil?
_(stale.etag).must_equal fresh.etag
end
it "should get a list of tables and views" do
tables = dataset.tables
# The code in before ensures we have at least one dataset
_(tables.count).must_be :>=, 2
tables.each do |t|
_(t.table_id).wont_be :nil?
_(t.created_at).must_be_kind_of Time # Loads full representation
end
end
it "should get all tables and views in pages with token" do
tables = dataset.tables(max: 1).all
_(tables.count).must_be :>=, 2
tables.each do |t|
_(t.table_id).wont_be :nil?
_(t.created_at).must_be_kind_of Time # Loads full representation
end
end
it "imports parquet data from GCS uri using hive partitioning with auto layout with load_job" do
gcs_uri = "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"
source_uri_prefix = "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/"
job_id = "test_job_#{SecureRandom.urlsafe_base64(21)}" # client-generated
job = dataset.load_job "gcs_hive_table_#{SecureRandom.hex(21)}", gcs_uri, job_id: job_id do |job|
job.format = :parquet
job.hive_partitioning_mode = :auto
job.hive_partitioning_source_uri_prefix = source_uri_prefix
end
_(job).must_be_kind_of Google::Cloud::Bigquery::LoadJob
_(job.job_id).must_equal job_id
job.wait_until_done!
_(job).wont_be :failed?
_(job.output_rows).must_equal 100
_(job.parquet?).must_equal true
_(job.hive_partitioning?).must_equal true
_(job.hive_partitioning_mode).must_equal "AUTO"
_(job.hive_partitioning_source_uri_prefix).must_equal source_uri_prefix
end
it "imports parquet data from GCS uri using hive partitioning with custom layout with load_job" do
gcs_uri = "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"
source_uri_prefix = "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/"
source_uri_prefix_with_schema = "#{source_uri_prefix}{pkey:STRING}/"
job_id = "test_job_#{SecureRandom.urlsafe_base64(21)}" # client-generated
job = dataset.load_job "gcs_hive_table_#{SecureRandom.hex(21)}", gcs_uri, job_id: job_id do |job|
job.format = :parquet
job.hive_partitioning_mode = :custom
job.hive_partitioning_source_uri_prefix = source_uri_prefix_with_schema
end
_(job).must_be_kind_of Google::Cloud::Bigquery::LoadJob
_(job.job_id).must_equal job_id
job.wait_until_done!
_(job).wont_be :failed?
_(job.output_rows).must_equal 150
_(job.parquet?).must_equal true
_(job.hive_partitioning?).must_equal true
_(job.hive_partitioning_mode).must_equal "CUSTOM"
_(job.hive_partitioning_source_uri_prefix).must_equal source_uri_prefix
end
it "imports data from a local file and creates a new table with schema and range partitioning in a block with load_job" do
job_id = "test_job_#{SecureRandom.urlsafe_base64(21)}" # client-generated
job = dataset.load_job "local_file_table_#{SecureRandom.hex(21)}", local_file, job_id: job_id do |job|
job.schema.integer "id", description: "id description", mode: :required
job.schema.string "breed", description: "breed description", mode: :required
job.schema.string "name", description: "name description", mode: :required
job.schema.timestamp "dob", description: "dob description", mode: :required
job.schema_update_options = schema_update_options
job.range_partitioning_field = "id"
job.range_partitioning_start = 0
job.range_partitioning_interval = 10
job.range_partitioning_end = 100
end
_(job).must_be_kind_of Google::Cloud::Bigquery::LoadJob
_(job.job_id).must_equal job_id
job.wait_until_done!
_(job.output_rows).must_equal 3
_(job.schema_update_options).must_equal schema_update_options
_(job.range_partitioning?).must_equal true
_(job.range_partitioning_field).must_equal "id"
_(job.range_partitioning_start).must_equal 0
_(job.range_partitioning_interval).must_equal 10
_(job.range_partitioning_end).must_equal 100
end
it "imports data from a local file and creates a new table with schema and time partitioning in a block with load_job" do
job_id = "test_job_#{SecureRandom.urlsafe_base64(21)}" # client-generated
job = dataset.load_job "local_file_table_#{SecureRandom.hex(21)}", local_file, job_id: job_id do |job|
job.schema.integer "id", description: "id description", mode: :required
job.schema.string "breed", description: "breed description", mode: :required
job.schema.string "name", description: "name description", mode: :required
job.schema.timestamp "dob", description: "dob description", mode: :required
job.schema_update_options = schema_update_options
job.time_partitioning_type = "DAY"
job.time_partitioning_field = "dob"
job.time_partitioning_expiration = 86_400
job.time_partitioning_require_filter = true
job.clustering_fields = clustering_fields
end
_(job).must_be_kind_of Google::Cloud::Bigquery::LoadJob
_(job.job_id).must_equal job_id
job.wait_until_done!
_(job.output_rows).must_equal 3
_(job.schema_update_options).must_equal schema_update_options
_(job.time_partitioning?).must_equal true
_(job.time_partitioning_type).must_equal "DAY"
_(job.time_partitioning_field).must_equal "dob"
_(job.time_partitioning_expiration).must_equal 86_400
_(job.time_partitioning_require_filter?).must_equal true
_(job.clustering?).must_equal true
_(job.clustering_fields).must_equal clustering_fields
end
it "imports data from a local file and creates a new table with schema as an option with load_job" do
schema = bigquery.schema do |s|
s.integer "id", description: "id description", mode: :required
s.string "breed", description: "breed description", mode: :required
s.string "name", description: "name description", mode: :required
s.timestamp "dob", description: "dob description", mode: :required
end
job = dataset.load_job "local_file_table_2", local_file, schema: schema
_(job.hive_partitioning?).must_equal false
_(job.hive_partitioning_mode).must_be_nil
_(job.hive_partitioning_source_uri_prefix).must_be_nil
_(job.range_partitioning?).must_equal false
_(job.range_partitioning_field).must_be_nil
_(job.range_partitioning_start).must_be_nil
_(job.range_partitioning_interval).must_be_nil
_(job.range_partitioning_end).must_be_nil
_(job.time_partitioning?).must_equal false
_(job.time_partitioning_type).must_be :nil?
_(job.time_partitioning_field).must_be :nil?
_(job.time_partitioning_expiration).must_be :nil?
_(job.time_partitioning_require_filter?).must_equal false
job.wait_until_done!
_(job.output_rows).must_equal 3
end
it "imports data from a local file and creates a new table without a schema with load_job" do
job = dataset.load_job table_with_schema.table_id, local_file, create: :never
job.wait_until_done!
_(job.output_rows).must_equal 3
end
it "imports data from a list of files in your bucket with load_job" do
more_data = rows.map { |row| JSON.generate row }.join("\n")
file1 = bucket.create_file local_file, random_file_destination_name
file2 = bucket.create_file StringIO.new(more_data), random_file_destination_name
gs_url = "gs://#{file2.bucket}/#{file2.name}"
# Test both by file object and URL as string
job = dataset.load_job table_with_schema.table_id, [file1, gs_url]
job.wait_until_done!
_(job).wont_be :failed?
_(job.input_files).must_equal 2
_(job.output_rows).must_equal 6
end
it "imports data from a local file and creates a new table with specified schema in a block with load" do
result = dataset.load "local_file_table_3", local_file do |schema|
schema.integer "id", description: "id description", mode: :required
schema.string "breed", description: "breed description", mode: :required
schema.string "name", description: "name description", mode: :required
schema.timestamp "dob", description: "dob description", mode: :required
end
_(result).must_equal true
end
it "imports data from a local file and creates a new table with specified schema as an option with load" do
schema = bigquery.schema do |s|
s.integer "id", description: "id description", mode: :required
s.string "breed", description: "breed description", mode: :required
s.string "name", description: "name description", mode: :required
s.timestamp "dob", description: "dob description", mode: :required
end
result = dataset.load "local_file_table_4", local_file, schema: schema
_(result).must_equal true
end
it "imports data from a local file and creates a new table without a schema with load" do
result = dataset.load table_with_schema.table_id, local_file do |job|
job.create = :never
end
_(result).must_equal true
end
it "imports data from a list of files in your bucket with load" do
more_data = rows.map { |row| JSON.generate row }.join("\n")
file1 = bucket.create_file local_file, random_file_destination_name
file2 = bucket.create_file StringIO.new(more_data), random_file_destination_name
gs_url = "gs://#{file2.bucket}/#{file2.name}"
# Test both by file object and URL as string
result = dataset.load table_with_schema.table_id, [file1, gs_url]
_(result).must_equal true
end
it "imports data from GCS Avro file and creates a new table with load" do
result = dataset.load(
table_avro_id,
"gs://#{samples_bucket}/bigquery/us-states/us-states.avro")
_(result).must_equal true
end
it "imports data from GCS Avro file and creates a new table with encryption with load" do
encrypt_config = bigquery.encryption(kms_key: kms_key)
result = dataset.load(
table_avro_id,
"gs://#{samples_bucket}/bigquery/us-states/us-states.avro") do |load|
load.write = :truncate
load.encryption = encrypt_config
end
_(result).must_equal true
table_avro.reload!
_(table_avro.encryption).must_equal encrypt_config
end
it "imports data from a local ORC file and creates a new table without a schema with load" do
result = dataset.load table_orc_id, local_orc_file
_(result).must_equal true
end
it "imports data from GCS ORC file and creates a new table with load" do
result = dataset.load(
table_orc_id,
"gs://#{samples_bucket}/bigquery/us-states/us-states.orc")
_(result).must_equal true
end
it "imports data from a local Parquet file and creates a new table without a schema with load" do
result = dataset.load table_parquet_id, local_parquet_file
_(result).must_equal true
end
it "imports data from GCS Parquet file and creates a new table with load" do
result = dataset.load(
table_parquet_id,
"gs://#{samples_bucket}/bigquery/us-states/us-states.parquet")
_(result).must_equal true
end
it "inserts rows directly and gets its data" do
insert_response = dataset.insert table_with_schema.table_id, rows
_(insert_response).must_be :success?
_(insert_response.insert_count).must_equal 3
_(insert_response.insert_errors).must_be :empty?
_(insert_response.error_rows).must_be :empty?
assert_data table_with_schema.data(max: 1)
end
it "insert skip invalid rows and return insert errors" do
insert_response = dataset.insert table_with_schema.table_id, invalid_rows, skip_invalid: true
_(insert_response).wont_be :success?
_(insert_response.insert_count).must_equal 2
_(insert_response.insert_errors).wont_be :empty?
_(insert_response.insert_errors.count).must_equal 1
_(insert_response.insert_errors.first.class).must_equal Google::Cloud::Bigquery::InsertResponse::InsertError
_(insert_response.insert_errors.first.index).must_equal 1
bigquery_row = invalid_rows[insert_response.insert_errors.first.index]
_(insert_response.insert_errors.first.row).must_equal bigquery_row
_(insert_response.error_rows).wont_be :empty?
_(insert_response.error_rows.count).must_equal 1
_(insert_response.error_rows.first).must_equal bigquery_row
_(insert_response.insert_error_for(invalid_rows[1]).index).must_equal insert_response.insert_errors.first.index
_(insert_response.errors_for(invalid_rows[1])).wont_be :empty?
_(insert_response.index_for(invalid_rows[1])).must_equal 1
end
it "inserts rows with autocreate option" do
# schema block is not needed in this test since table exists, but provide anyway
insert_response = dataset.insert table_with_schema.table_id, rows, autocreate: true do |t|
t.schema.integer "id", description: "id description", mode: :required
t.schema.string "breed", description: "breed description", mode: :required
t.schema.string "name", description: "name description", mode: :required
t.schema.timestamp "dob", description: "dob description", mode: :required
end
_(insert_response).must_be :success?
_(insert_response.insert_count).must_equal 3
_(insert_response.insert_errors).must_be :empty?
_(insert_response.error_rows).must_be :empty?
table = dataset.table table_with_schema_id
_(table).wont_be_nil
assert_data table.data(max: 1)
end
it "inserts rows with insert_ids option" do
insert_response = dataset.insert table_with_schema.table_id, rows, insert_ids: insert_ids
_(insert_response).must_be :success?
_(insert_response.insert_count).must_equal 3
_(insert_response.insert_errors).must_be :empty?
_(insert_response.error_rows).must_be :empty?
assert_data table_with_schema.data(max: 1)
end
it "inserts row with max scale numeric and bignumeric values" do
rows = [
{
name: "cat 7",
breed: "the cat kind",
id: 7,
dob: Time.now.utc,
my_numeric: BigDecimal(string_numeric),
my_bignumeric: string_bignumeric # BigDecimal would be rounded, use String instead!
},
{
name: "cat 8",
breed: "the cat kind",
id: 8,
dob: Time.now.utc,
my_numeric: BigDecimal(string_numeric),
my_bignumeric: BigDecimal(string_bignumeric) # BigDecimal will be rounded to scale 9.
}
]
insert_response = dataset.insert table_with_schema.table_id, rows
_(insert_response).must_be :success?
_(insert_response.insert_count).must_equal 2
_(insert_response.insert_errors).must_be :empty?
_(insert_response.error_rows).must_be :empty?
data = dataset.query "SELECT id, my_numeric, my_bignumeric FROM #{table_with_schema_id} WHERE id IN (7,8) ORDER BY id"
_(data.count).must_equal 2
_(data.total).must_equal 2
_(data[0][:my_numeric]).must_equal BigDecimal(string_numeric)
_(data[0][:my_bignumeric]).must_equal BigDecimal(string_bignumeric)
_(data[1][:my_numeric]).must_equal BigDecimal(string_numeric)
_(data[1][:my_bignumeric]).must_equal BigDecimal(string_numeric) # Rounded to scale 9.
end
it "creates missing table while inserts rows with autocreate option" do
new_table_id = "new_dataset_table_id_#{rand(1000)}"
insert_response = dataset.insert new_table_id, rows, autocreate: true do |t|
t.schema.integer "id", description: "id description", mode: :required
t.schema.string "breed", description: "breed description", mode: :required
t.schema.string "name", description: "name description", mode: :required
t.schema.timestamp "dob", description: "dob description", mode: :required
end
_(insert_response).must_be :success?
_(insert_response.insert_count).must_equal 3
_(insert_response.insert_errors).must_be :empty?
_(insert_response.error_rows).must_be :empty?
table = dataset.table new_table_id
_(table).wont_be_nil
assert_data table.data(max: 1)
end
it "queries in session mode" do
job = dataset.query_job "CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo", create_session: true
job.wait_until_done!
_(job).wont_be :failed?
_(job.session_id).wont_be :nil?
job_2 = dataset.query_job "SELECT * FROM temptable", session_id: job.session_id
job_2.wait_until_done!
_(job_2).wont_be :failed?
_(job_2.session_id).wont_be :nil?
_(job_2.session_id).must_equal job.session_id
_(job_2.data.first).wont_be :nil?
_(job_2.data.first[:foo]).must_equal 17
data = dataset.query "SELECT * FROM temptable", session_id: job.session_id
_(data.first).wont_be :nil?
_(data.first[:foo]).must_equal 17
end
end