From d3478cf668bf1b0c5cc92045751ceca88150f5ee Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 9 Mar 2022 17:55:11 +0000 Subject: [PATCH 1/8] feat(bigquery): expose connections and schema autodetect modifier This exposes the connectionID in the in the external data config, and wires in the new autodetect schema as a variadic option on the Update() method for tables. Technically this is a breaking signature change, but in practice variadic options can be omitted entirely, so existing code will continue to work without issue. --- bigquery/external.go | 7 +++++++ bigquery/external_test.go | 1 + bigquery/table.go | 35 ++++++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/bigquery/external.go b/bigquery/external.go index afe255660d6..902193c75b6 100644 --- a/bigquery/external.go +++ b/bigquery/external.go @@ -104,6 +104,11 @@ type ExternalDataConfig struct { // // StringTargetType supports all precision and scale values. DecimalTargetTypes []DecimalTargetType + + // ConnectionId associates an external data configuration with a connection ID. + // Connections are managed through the BigQuery Connection API: + // https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1 + ConnectionId string } func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { @@ -115,6 +120,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { IgnoreUnknownValues: e.IgnoreUnknownValues, MaxBadRecords: e.MaxBadRecords, HivePartitioningOptions: e.HivePartitioningOptions.toBQ(), + ConnectionId: e.ConnectionId, } if e.Schema != nil { q.Schema = e.Schema.toBQ() @@ -138,6 +144,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi MaxBadRecords: q.MaxBadRecords, Schema: bqToSchema(q.Schema), HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions), + ConnectionId: q.ConnectionId, } for _, v := range q.DecimalTargetTypes { e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v)) diff --git a/bigquery/external_test.go b/bigquery/external_test.go index 6eb930d74b4..a5b0233b1ba 100644 --- a/bigquery/external_test.go +++ b/bigquery/external_test.go @@ -41,6 +41,7 @@ func TestExternalDataConfig(t *testing.T) { SkipLeadingRows: 3, NullMarker: "marker", }, + ConnectionId: "connection", }, { SourceFormat: GoogleSheets, diff --git a/bigquery/table.go b/bigquery/table.go index 764e6b27a7c..314de03eb46 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -713,8 +713,25 @@ func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { // NeverExpire is a sentinel value used to remove a table'e expiration time. var NeverExpire = time.Time{}.Add(-1) +// We use this for the option pattern rather than exposing the underlying +// discovery type directly. +type tablePatchCall struct { + call *bq.TablesPatchCall +} + +// TableUpdateOption allow requests to update table metadata. +type TableUpdateOption func(*tablePatchCall) + +// WithAutoDetectSchema governs whether the schema autodetection occurs as part of the table update. +// This is relevant in cases like external tables where schema is detected from the source data. +func WithAutoDetectSchema(b bool) TableUpdateOption { + return func(tpc *tablePatchCall) { + tpc.call.AutodetectSchema(b) + } +} + // Update modifies specific Table metadata fields. -func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (md *TableMetadata, err error) { +func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string, opts ...TableUpdateOption) (md *TableMetadata, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Update") defer func() { trace.EndSpan(ctx, err) }() @@ -722,14 +739,22 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin if err != nil { return nil, err } - call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx) - setClientHeader(call.Header()) + + tpc := &tablePatchCall{ + call: t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx), + } + + for _, o := range opts { + o(tpc) + } + + setClientHeader(tpc.call.Header()) if etag != "" { - call.Header().Set("If-Match", etag) + tpc.call.Header().Set("If-Match", etag) } var res *bq.Table if err := runWithRetry(ctx, func() (err error) { - res, err = call.Do() + res, err = tpc.call.Do() return err }); err != nil { return nil, err From 06ac231eb11cbf4c8dbd6f6400ff201b3d819a49 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 9 Mar 2022 18:28:13 +0000 Subject: [PATCH 2/8] update naming0 --- bigquery/external.go | 8 ++++---- bigquery/external_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bigquery/external.go b/bigquery/external.go index 902193c75b6..c5337a00216 100644 --- a/bigquery/external.go +++ b/bigquery/external.go @@ -105,10 +105,10 @@ type ExternalDataConfig struct { // StringTargetType supports all precision and scale values. DecimalTargetTypes []DecimalTargetType - // ConnectionId associates an external data configuration with a connection ID. + // ConnectionID associates an external data configuration with a connection ID. // Connections are managed through the BigQuery Connection API: // https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1 - ConnectionId string + ConnectionID string } func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { @@ -120,7 +120,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { IgnoreUnknownValues: e.IgnoreUnknownValues, MaxBadRecords: e.MaxBadRecords, HivePartitioningOptions: e.HivePartitioningOptions.toBQ(), - ConnectionId: e.ConnectionId, + ConnectionId: e.ConnectionID, } if e.Schema != nil { q.Schema = e.Schema.toBQ() @@ -144,7 +144,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi MaxBadRecords: q.MaxBadRecords, Schema: bqToSchema(q.Schema), HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions), - ConnectionId: q.ConnectionId, + ConnectionID: q.ConnectionId, } for _, v := range q.DecimalTargetTypes { e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v)) diff --git a/bigquery/external_test.go b/bigquery/external_test.go index a5b0233b1ba..db83771fc81 100644 --- a/bigquery/external_test.go +++ b/bigquery/external_test.go @@ -41,7 +41,7 @@ func TestExternalDataConfig(t *testing.T) { SkipLeadingRows: 3, NullMarker: "marker", }, - ConnectionId: "connection", + ConnectionID: "connection", }, { SourceFormat: GoogleSheets, From b70a7b206639dd76650fad55a0066952cf459549 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 17 Mar 2022 17:53:11 +0000 Subject: [PATCH 3/8] add ext modification to update, integration test --- bigquery/integration_test.go | 79 ++++++++++++++++++++++++++++++++++++ bigquery/table.go | 8 ++++ 2 files changed, 87 insertions(+) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 1c6f5c823e8..b287b348954 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -2330,6 +2330,85 @@ func TestIntegration_IteratorSource(t *testing.T) { } } +func TestIntegration_ExternalAutodetect(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + testTable := dataset.Table(tableIDs.New()) + + origExtCfg := &ExternalDataConfig{ + SourceFormat: Avro, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"}, + } + + err := testTable.Create(ctx, &TableMetadata{ + ExternalDataConfig: origExtCfg, + }) + if err != nil { + t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err) + } + + origMeta, err := testTable.Metadata(ctx) + if err != nil { + t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err) + } + + wantSchema := Schema{ + {Name: "stringfield", Type: "STRING"}, + {Name: "int64field", Type: "INTEGER"}, + } + if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" { + t.Fatalf("orig schema, got=-, want=+\n%s", diff) + } + + // Now, point at the new files, but don't signal autodetect. + newExtCfg := &ExternalDataConfig{ + SourceFormat: Avro, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"}, + } + + newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{ + ExternalDataConfig: newExtCfg, + }, origMeta.ETag) + if err != nil { + t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err) + } + if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" { + t.Fatalf("new schema, got=-, want=+\n%s", diff) + } + + // Now, signal autodetect in another update. + // This should yield a new schema. + newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{ExternalDataConfig: newExtCfg}, newMeta.ETag, WithAutoDetectSchema(true)) + if err != nil { + t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err) + } + + t.Logf("uris: %v", newMeta2.ExternalDataConfig.SourceURIs) + + wantSchema2 := Schema{ + {Name: "stringfield", Type: "STRING"}, + {Name: "int64field", Type: "INTEGER"}, + {Name: "boolfield", Type: "BOOL"}, + } + if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" { + t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff) + } + + id, _ := testTable.Identifier(StandardSQLID) + q := client.Query(fmt.Sprintf("SELECT * FROM %s", id)) + it, err := q.Read(ctx) + if err != nil { + t.Fatalf("query read: %v", err) + } + wantRows := [][]Value{ + {"foo", int64(12), true}, + } + checkReadAndTotalRows(t, "row check", it, wantRows) +} + func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") diff --git a/bigquery/table.go b/bigquery/table.go index 314de03eb46..01b2f97cde2 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -787,6 +787,10 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) { if tm.EncryptionConfig != nil { t.EncryptionConfiguration = tm.EncryptionConfig.toBQ() } + if tm.ExternalDataConfig != nil { + cfg := tm.ExternalDataConfig.toBQ() + t.ExternalDataConfiguration = &cfg + } if tm.Clustering != nil { t.Clustering = tm.Clustering.toBQ() @@ -873,6 +877,10 @@ type TableMetadataToUpdate struct { // set ExpirationTime to NeverExpire. The zero value is ignored. ExpirationTime time.Time + // ExternalDataConfig controls the definition of a table defined against + // an external source, such as one based on files in Google Cloud Storage. + ExternalDataConfig *ExternalDataConfig + // The query to use for a view. ViewQuery optional.String From d0c35f6cf37ea559de0ee50434b371aacd8e2cc1 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 17 Mar 2022 18:03:03 +0000 Subject: [PATCH 4/8] update expectations --- bigquery/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index b287b348954..74438d13aa2 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -2404,7 +2404,7 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { t.Fatalf("query read: %v", err) } wantRows := [][]Value{ - {"foo", int64(12), true}, + {"bar", int64(32), int64(314)}, } checkReadAndTotalRows(t, "row check", it, wantRows) } From e84b400a95c064355bf8ccafdf3223563d9b281f Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 17 Mar 2022 18:06:41 +0000 Subject: [PATCH 5/8] update test expectations --- bigquery/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 74438d13aa2..3d4f6de8bf5 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -2391,7 +2391,7 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { wantSchema2 := Schema{ {Name: "stringfield", Type: "STRING"}, {Name: "int64field", Type: "INTEGER"}, - {Name: "boolfield", Type: "BOOL"}, + {Name: "otherfield", Type: "INTEGER"}, } if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" { t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff) From cdba5461b1f98654190488ee9a84d107decb279d Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 21 Mar 2022 16:53:54 +0000 Subject: [PATCH 6/8] push latest --- bigquery/integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 3d4f6de8bf5..e87d61c0e26 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -2337,6 +2337,7 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { ctx := context.Background() testTable := dataset.Table(tableIDs.New()) + t.Logf("testtable: %s", testTable.FullyQualifiedName()) origExtCfg := &ExternalDataConfig{ SourceFormat: Avro, From 04837fd2adb63c78d33d34a58d592b56f0bdfcbc Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 8 Apr 2022 00:12:34 +0000 Subject: [PATCH 7/8] remove logging --- bigquery/integration_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 5509206a7c0..860bc072f7f 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -1586,7 +1586,6 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { ctx := context.Background() testTable := dataset.Table(tableIDs.New()) - t.Logf("testtable: %s", testTable.FullyQualifiedName()) origExtCfg := &ExternalDataConfig{ SourceFormat: Avro, @@ -1631,7 +1630,7 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { // Now, signal autodetect in another update. // This should yield a new schema. - newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{ExternalDataConfig: newExtCfg}, newMeta.ETag, WithAutoDetectSchema(true)) + newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{}, newMeta.ETag, WithAutoDetectSchema(true)) if err != nil { t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err) } From 724752b7440b1f83a7938729cb78f124aa33e430 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 3 May 2022 17:27:16 +0000 Subject: [PATCH 8/8] omit debug logging --- bigquery/integration_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 8313d5de79d..4dbf14a3c38 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -1675,8 +1675,6 @@ func TestIntegration_ExternalAutodetect(t *testing.T) { t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err) } - t.Logf("uris: %v", newMeta2.ExternalDataConfig.SourceURIs) - wantSchema2 := Schema{ {Name: "stringfield", Type: "STRING"}, {Name: "int64field", Type: "INTEGER"},