Skip to content

Commit

Permalink
feat(bigquery): Add session support
Browse files Browse the repository at this point in the history
* Add create_session and session_id params to Project#query_job
* Add session_id param to Project#query
* Add create_session and session_id params to Dataset#query_job
* Add session_id param to Dataset#query
* Add Job#session_id
* Add QueryJob::Updater#create_session=
* Add QueryJob::Updater#session_id=

closes: googleapis#15553
  • Loading branch information
quartzmo committed Nov 4, 2021
1 parent 77e7234 commit 71a8df1
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 40 deletions.
2 changes: 1 addition & 1 deletion google-cloud-bigquery/.rubocop.yml
Expand Up @@ -14,7 +14,7 @@ Style/Documentation:
Lint/MixedRegexpCaptureTypes:
Enabled: false
Metrics/AbcSize:
Max: 50
Max: 55
Metrics/BlockLength:
Exclude:
- "google-cloud-bigquery.gemspec"
Expand Down
19 changes: 19 additions & 0 deletions google-cloud-bigquery/acceptance/bigquery/advanced_test.rb
Expand Up @@ -69,6 +69,25 @@
_(rows[2]).must_equal({ name: "Gandalf", spells_name: "Skydragon", spells_properties_name: "Explodey", spells_properties_power: 11.0 })
end

it "queries in session mode" do
job = bigquery.query_job "CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo", dataset: dataset, create_session: true
job.wait_until_done!
_(job).wont_be :failed?
_(job.session_id).wont_be :nil?

job_2 = bigquery.query_job "SELECT * FROM temptable", dataset: dataset, session_id: job.session_id
job_2.wait_until_done!
_(job_2).wont_be :failed?
_(job_2.session_id).wont_be :nil?
_(job_2.session_id).must_equal job.session_id
_(job_2.data.first).wont_be :nil?
_(job_2.data.first[:foo]).must_equal 17

data = bigquery.query "SELECT * FROM temptable", dataset: dataset, session_id: job.session_id
_(data.first).wont_be :nil?
_(data.first[:foo]).must_equal 17
end

it "modifies a nested schema via field" do
empty_table_id = "#{table_id}_empty"
empty_table = dataset.table empty_table_id
Expand Down
Expand Up @@ -218,6 +218,7 @@
query_job = dataset.query_job query, job_id: job_id
_(query_job).must_be_kind_of Google::Cloud::Bigquery::QueryJob
_(query_job.job_id).must_equal job_id
_(query_job.session_id).must_be :nil?
query_job.wait_until_done!
_(query_job.done?).must_equal true
_(query_job.data.total).wont_be_nil
Expand Down
19 changes: 19 additions & 0 deletions google-cloud-bigquery/acceptance/bigquery/dataset_test.rb
Expand Up @@ -541,4 +541,23 @@

assert_data table.data(max: 1)
end

it "queries in session mode" do
job = dataset.query_job "CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo", create_session: true
job.wait_until_done!
_(job).wont_be :failed?
_(job.session_id).wont_be :nil?

job_2 = dataset.query_job "SELECT * FROM temptable", session_id: job.session_id
job_2.wait_until_done!
_(job_2).wont_be :failed?
_(job_2.session_id).wont_be :nil?
_(job_2.session_id).must_equal job.session_id
_(job_2.data.first).wont_be :nil?
_(job_2.data.first[:foo]).must_equal 17

data = dataset.query "SELECT * FROM temptable", session_id: job.session_id
_(data.first).wont_be :nil?
_(data.first[:foo]).must_equal 17
end
end
89 changes: 74 additions & 15 deletions google-cloud-bigquery/lib/google/cloud/bigquery/dataset.rb
Expand Up @@ -1244,6 +1244,8 @@ def routines token: nil, max: nil, filter: nil
# Flattens all nested and repeated fields in the query results. The
# default value is `true`. `large_results` parameter must be `true` if
# this is set to `false`.
# @param [Integer] maximum_billing_tier Deprecated: Change the billing
# tier to allow high-compute queries.
# @param [Integer] maximum_bytes_billed Limits the bytes billed for this
# job. Queries that will have bytes billed beyond this limit will fail
# (without incurring a charge). Optional. If unspecified, this will be
Expand Down Expand Up @@ -1294,8 +1296,12 @@ def routines token: nil, max: nil, filter: nil
# For additional information on migrating, see: [Migrating to
# standard SQL - Differences in user-defined JavaScript
# functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/migrating-from-legacy-sql#differences_in_user-defined_javascript_functions)
# @param [Integer] maximum_billing_tier Deprecated: Change the billing
# tier to allow high-compute queries.
# @param [Boolean] create_session If true, creates a new session, where the
# session ID will be a server generated random id. If false, runs query
# with an existing session ID when one is provided in the `session_id`
# param, otherwise runs query in non-session mode. See {Job#session_id}.
# @param [String] session_id The ID of an existing session. See also the
# `create_session` param and {Job#session_id}.
# @yield [job] a job configuration object
# @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job
# configuration object for setting additional options for the query.
Expand Down Expand Up @@ -1435,16 +1441,52 @@ def routines token: nil, max: nil, filter: nil
#
# @!group Data
#
def query_job query, params: nil, types: nil, external: nil, priority: "INTERACTIVE", cache: true, table: nil,
create: nil, write: nil, dryrun: nil, standard_sql: nil, legacy_sql: nil, large_results: nil,
flatten: nil, maximum_billing_tier: nil, maximum_bytes_billed: nil, job_id: nil, prefix: nil,
labels: nil, udfs: nil
def query_job query,
params: nil,
types: nil,
external: nil,
priority: "INTERACTIVE",
cache: true,
table: nil,
create: nil,
write: nil,
dryrun: nil,
standard_sql: nil,
legacy_sql: nil,
large_results: nil,
flatten: nil,
maximum_billing_tier: nil,
maximum_bytes_billed: nil,
job_id: nil,
prefix: nil,
labels: nil,
udfs: nil,
create_session: nil,
session_id: nil
ensure_service!
options = { params: params, types: types, external: external, priority: priority, cache: cache, table: table,
create: create, write: write, dryrun: dryrun, standard_sql: standard_sql, legacy_sql: legacy_sql,
large_results: large_results, flatten: flatten, maximum_billing_tier: maximum_billing_tier,
maximum_bytes_billed: maximum_bytes_billed, job_id: job_id, prefix: prefix, labels: labels,
udfs: udfs }
options = {
params: params,
types: types,
external: external,
priority: priority,
cache: cache,
table: table,
create: create,
write: write,
dryrun: dryrun,
standard_sql: standard_sql,
legacy_sql: legacy_sql,
large_results: large_results,
flatten: flatten,
maximum_billing_tier: maximum_billing_tier,
maximum_bytes_billed: maximum_bytes_billed,
job_id: job_id,
prefix: prefix,
labels: labels,
udfs: udfs,
create_session: create_session,
session_id: session_id
}

updater = QueryJob::Updater.from_options service, query, options
updater.dataset = self
Expand Down Expand Up @@ -1566,6 +1608,8 @@ def query_job query, params: nil, types: nil, external: nil, priority: "INTERACT
# When set to false, the values of `large_results` and `flatten` are
# ignored; the query will be run as if `large_results` is true and
# `flatten` is false. Optional. The default value is false.
# @param [String] session_id The ID of an existing session. See the
# `create_session` param in {#query_job} and {Job#session_id}.
# @yield [job] a job configuration object
# @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job
# configuration object for setting additional options for the query.
Expand Down Expand Up @@ -1699,10 +1743,25 @@ def query_job query, params: nil, types: nil, external: nil, priority: "INTERACT
#
# @!group Data
#
def query query, params: nil, types: nil, external: nil, max: nil, cache: true,
standard_sql: nil, legacy_sql: nil, &block
job = query_job query, params: params, types: types, external: external, cache: cache,
standard_sql: standard_sql, legacy_sql: legacy_sql, &block
def query query,
params: nil,
types: nil,
external: nil,
max: nil,
cache: true,
standard_sql: nil,
legacy_sql: nil,
session_id: nil,
&block
job = query_job query,
params: params,
types: types,
external: external,
cache: cache,
standard_sql: standard_sql,
legacy_sql: legacy_sql,
session_id: session_id,
&block
job.wait_until_done!
ensure_job_succeeded! job

Expand Down
10 changes: 10 additions & 0 deletions google-cloud-bigquery/lib/google/cloud/bigquery/job.rb
Expand Up @@ -226,6 +226,16 @@ def reservation_usage
Array(@gapi.statistics.reservation_usage).map { |g| ReservationUsage.from_gapi g }
end

##
# The ID of the session if this job is part of one. See the `create_session` param in {Project#query_job} and
# {Dataset#query_job}.
#
# @return [String, nil] The session ID, or `nil` if not associated with a session.
#
def session_id
@gapi.statistics.session_info&.session_id
end

##
# The ID of a multi-statement transaction.
#
Expand Down
98 changes: 82 additions & 16 deletions google-cloud-bigquery/lib/google/cloud/bigquery/project.rb
Expand Up @@ -402,6 +402,8 @@ def copy source_table, destination_table, create: nil, write: nil, &block
# Flattens all nested and repeated fields in the query results. The
# default value is `true`. `large_results` parameter must be `true` if
# this is set to `false`.
# @param [Integer] maximum_billing_tier Deprecated: Change the billing
# tier to allow high-compute queries.
# @param [Integer] maximum_bytes_billed Limits the bytes billed for this
# job. Queries that will have bytes billed beyond this limit will fail
# (without incurring a charge). Optional. If unspecified, this will be
Expand Down Expand Up @@ -455,8 +457,12 @@ def copy source_table, destination_table, create: nil, write: nil, &block
# For additional information on migrating, see: [Migrating to
# standard SQL - Differences in user-defined JavaScript
# functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/migrating-from-legacy-sql#differences_in_user-defined_javascript_functions)
# @param [Integer] maximum_billing_tier Deprecated: Change the billing
# tier to allow high-compute queries.
# @param [Boolean] create_session If true, creates a new session, where the
# session ID will be a server generated random id. If false, runs query
# with an existing session ID when one is provided in the `session_id`
# param, otherwise runs query in non-session mode. See {Job#session_id}.
# @param [String] session_id The ID of an existing session. See also the
# `create_session` param and {Job#session_id}.
# @yield [job] a job configuration object
# @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job
# configuration object for setting query options.
Expand Down Expand Up @@ -599,17 +605,56 @@ def copy source_table, destination_table, create: nil, write: nil, &block
# end
# end
#
def query_job query, params: nil, types: nil, external: nil, priority: "INTERACTIVE", cache: true, table: nil,
create: nil, write: nil, dryrun: nil, dataset: nil, project: nil, standard_sql: nil,
legacy_sql: nil, large_results: nil, flatten: nil, maximum_billing_tier: nil,
maximum_bytes_billed: nil, job_id: nil, prefix: nil, labels: nil, udfs: nil
def query_job query,
params: nil,
types: nil,
external: nil,
priority: "INTERACTIVE",
cache: true,
table: nil,
create: nil,
write: nil,
dryrun: nil,
dataset: nil,
project: nil,
standard_sql: nil,
legacy_sql: nil,
large_results: nil,
flatten: nil,
maximum_billing_tier: nil,
maximum_bytes_billed: nil,
job_id: nil,
prefix: nil,
labels: nil,
udfs: nil,
create_session: nil,
session_id: nil
ensure_service!
options = { params: params, types: types, external: external, priority: priority, cache: cache, table: table,
create: create, write: write, dryrun: dryrun, dataset: dataset,
project: (project || self.project), standard_sql: standard_sql, legacy_sql: legacy_sql,
large_results: large_results, flatten: flatten, maximum_billing_tier: maximum_billing_tier,
maximum_bytes_billed: maximum_bytes_billed, job_id: job_id, prefix: prefix, labels: labels,
udfs: udfs }
options = {
params: params,
types: types,
external: external,
priority: priority,
cache: cache,
table: table,
create: create,
write: write,
dryrun: dryrun,
dataset: dataset,
project: (project || self.project),
standard_sql: standard_sql,
legacy_sql: legacy_sql,
large_results: large_results,
flatten: flatten,
maximum_billing_tier: maximum_billing_tier,
maximum_bytes_billed: maximum_bytes_billed,
job_id: job_id,
prefix: prefix,
labels: labels,
udfs: udfs,
create_session: create_session,
session_id: session_id
}

updater = QueryJob::Updater.from_options service, query, options

Expand Down Expand Up @@ -730,6 +775,8 @@ def query_job query, params: nil, types: nil, external: nil, priority: "INTERACT
# When set to false, the values of `large_results` and `flatten` are
# ignored; the query will be run as if `large_results` is true and
# `flatten` is false. Optional. The default value is false.
# @param [String] session_id The ID of an existing session. See the
# `create_session` param in {#query_job} and {Job#session_id}.
# @yield [job] a job configuration object
# @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job
# configuration object for setting additional options for the query.
Expand Down Expand Up @@ -873,10 +920,29 @@ def query_job query, params: nil, types: nil, external: nil, priority: "INTERACT
# # Retrieve the next page of results
# data = data.next if data.next?
#
def query query, params: nil, types: nil, external: nil, max: nil, cache: true, dataset: nil, project: nil,
standard_sql: nil, legacy_sql: nil, &block
job = query_job query, params: params, types: types, external: external, cache: cache, dataset: dataset,
project: project, standard_sql: standard_sql, legacy_sql: legacy_sql, &block
def query query,
params: nil,
types: nil,
external: nil,
max: nil,
cache: true,
dataset: nil,
project: nil,
standard_sql: nil,
legacy_sql: nil,
session_id: nil,
&block
job = query_job query,
params: params,
types: types,
external: external,
cache: cache,
dataset: dataset,
project: project,
standard_sql: standard_sql,
legacy_sql: legacy_sql,
session_id: session_id,
&block
job.wait_until_done!

if job.failed?
Expand Down
33 changes: 33 additions & 0 deletions google-cloud-bigquery/lib/google/cloud/bigquery/query_job.rb
Expand Up @@ -775,6 +775,8 @@ def self.from_options service, query, options
updater = QueryJob::Updater.new service, req
updater.set_params_and_types options[:params], options[:types] if options[:params]
updater.create = options[:create]
updater.create_session = options[:create_session]
updater.session_id = options[:session_id] if options[:session_id]
updater.write = options[:write]
updater.table = options[:table]
updater.dryrun = options[:dryrun]
Expand Down Expand Up @@ -1018,6 +1020,37 @@ def create= value
@gapi.configuration.query.create_disposition = Convert.create_disposition value
end

##
# Sets the create_session property. If true, creates a new session,
# where session id will be a server generated random id. If false,
# runs query with an existing {#session_id=}, otherwise runs query in
# non-session mode. The default value is `false`.
#
# @param [Boolean] value The create_session property. The default
# value is `false`.
#
# @!group Attributes
def create_session= value
@gapi.configuration.query.create_session = value
end

##
# Sets the session ID for a query run in session mode. See {#create_session=}.
#
# @param [String] value The session ID. The default value is `nil`.
#
# @!group Attributes
def session_id= value
@gapi.configuration.query.connection_properties ||= []
prop = @gapi.configuration.query.connection_properties.find { |cp| cp.key == "session_id" }
if prop
prop.value = value
else
prop = Google::Apis::BigqueryV2::ConnectionProperty.new key: "session_id", value: value
@gapi.configuration.query.connection_properties << prop
end
end

##
# Sets the write disposition for when the query results table exists.
#
Expand Down

0 comments on commit 71a8df1

Please sign in to comment.