From c72e34fd79990eedaa56ed9e5121ab1a7fc4e2da Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 3 May 2022 10:36:41 -0700 Subject: [PATCH] feat(bigquery): expose connections and schema autodetect modifier (#5739) * feat(bigquery): expose connections and schema autodetect modifier This exposes the connectionID in the in the external data config, and wires in the new autodetect schema as a variadic option on the Update() method for tables. Technically this is a breaking signature change, but in practice variadic options can be omitted entirely, so existing code will continue to work without issue. --- bigquery/external.go | 7 ++++ bigquery/external_test.go | 1 + bigquery/integration_test.go | 77 ++++++++++++++++++++++++++++++++++++ bigquery/table.go | 43 +++++++++++++++++--- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/bigquery/external.go b/bigquery/external.go index afe255660d6..c5337a00216 100644 --- a/bigquery/external.go +++ b/bigquery/external.go @@ -104,6 +104,11 @@ type ExternalDataConfig struct { // // StringTargetType supports all precision and scale values. DecimalTargetTypes []DecimalTargetType + + // ConnectionID associates an external data configuration with a connection ID. + // Connections are managed through the BigQuery Connection API: + // https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1 + ConnectionID string } func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { @@ -115,6 +120,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { IgnoreUnknownValues: e.IgnoreUnknownValues, MaxBadRecords: e.MaxBadRecords, HivePartitioningOptions: e.HivePartitioningOptions.toBQ(), + ConnectionId: e.ConnectionID, } if e.Schema != nil { q.Schema = e.Schema.toBQ() @@ -138,6 +144,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi MaxBadRecords: q.MaxBadRecords, Schema: bqToSchema(q.Schema), HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions), + ConnectionID: q.ConnectionId, } for _, v := range q.DecimalTargetTypes { e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v)) diff --git a/bigquery/external_test.go b/bigquery/external_test.go index 6eb930d74b4..db83771fc81 100644 --- a/bigquery/external_test.go +++ b/bigquery/external_test.go @@ -41,6 +41,7 @@ func TestExternalDataConfig(t *testing.T) { SkipLeadingRows: 3, NullMarker: "marker", }, + ConnectionID: "connection", }, { SourceFormat: GoogleSheets, diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 5505170fef2..4dbf14a3c38 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -1619,6 +1619,83 @@ func TestIntegration_IteratorSource(t *testing.T) { } } +func TestIntegration_ExternalAutodetect(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + testTable := dataset.Table(tableIDs.New()) + + origExtCfg := &ExternalDataConfig{ + SourceFormat: Avro, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"}, + } + + err := testTable.Create(ctx, &TableMetadata{ + ExternalDataConfig: origExtCfg, + }) + if err != nil { + t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err) + } + + origMeta, err := testTable.Metadata(ctx) + if err != nil { + t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err) + } + + wantSchema := Schema{ + {Name: "stringfield", Type: "STRING"}, + {Name: "int64field", Type: "INTEGER"}, + } + if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" { + t.Fatalf("orig schema, got=-, want=+\n%s", diff) + } + + // Now, point at the new files, but don't signal autodetect. + newExtCfg := &ExternalDataConfig{ + SourceFormat: Avro, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"}, + } + + newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{ + ExternalDataConfig: newExtCfg, + }, origMeta.ETag) + if err != nil { + t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err) + } + if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" { + t.Fatalf("new schema, got=-, want=+\n%s", diff) + } + + // Now, signal autodetect in another update. + // This should yield a new schema. + newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{}, newMeta.ETag, WithAutoDetectSchema(true)) + if err != nil { + t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err) + } + + wantSchema2 := Schema{ + {Name: "stringfield", Type: "STRING"}, + {Name: "int64field", Type: "INTEGER"}, + {Name: "otherfield", Type: "INTEGER"}, + } + if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" { + t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff) + } + + id, _ := testTable.Identifier(StandardSQLID) + q := client.Query(fmt.Sprintf("SELECT * FROM %s", id)) + it, err := q.Read(ctx) + if err != nil { + t.Fatalf("query read: %v", err) + } + wantRows := [][]Value{ + {"bar", int64(32), int64(314)}, + } + checkReadAndTotalRows(t, "row check", it, wantRows) +} + func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") diff --git a/bigquery/table.go b/bigquery/table.go index 2a67fa3971d..abf5aa69dea 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -758,8 +758,25 @@ func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { // NeverExpire is a sentinel value used to remove a table'e expiration time. var NeverExpire = time.Time{}.Add(-1) +// We use this for the option pattern rather than exposing the underlying +// discovery type directly. +type tablePatchCall struct { + call *bq.TablesPatchCall +} + +// TableUpdateOption allow requests to update table metadata. +type TableUpdateOption func(*tablePatchCall) + +// WithAutoDetectSchema governs whether the schema autodetection occurs as part of the table update. +// This is relevant in cases like external tables where schema is detected from the source data. +func WithAutoDetectSchema(b bool) TableUpdateOption { + return func(tpc *tablePatchCall) { + tpc.call.AutodetectSchema(b) + } +} + // Update modifies specific Table metadata fields. -func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (md *TableMetadata, err error) { +func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string, opts ...TableUpdateOption) (md *TableMetadata, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Update") defer func() { trace.EndSpan(ctx, err) }() @@ -767,14 +784,22 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin if err != nil { return nil, err } - call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx) - setClientHeader(call.Header()) + + tpc := &tablePatchCall{ + call: t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx), + } + + for _, o := range opts { + o(tpc) + } + + setClientHeader(tpc.call.Header()) if etag != "" { - call.Header().Set("If-Match", etag) + tpc.call.Header().Set("If-Match", etag) } var res *bq.Table if err := runWithRetry(ctx, func() (err error) { - res, err = call.Do() + res, err = tpc.call.Do() return err }); err != nil { return nil, err @@ -807,6 +832,10 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) { if tm.EncryptionConfig != nil { t.EncryptionConfiguration = tm.EncryptionConfig.toBQ() } + if tm.ExternalDataConfig != nil { + cfg := tm.ExternalDataConfig.toBQ() + t.ExternalDataConfiguration = &cfg + } if tm.Clustering != nil { t.Clustering = tm.Clustering.toBQ() @@ -893,6 +922,10 @@ type TableMetadataToUpdate struct { // set ExpirationTime to NeverExpire. The zero value is ignored. ExpirationTime time.Time + // ExternalDataConfig controls the definition of a table defined against + // an external source, such as one based on files in Google Cloud Storage. + ExternalDataConfig *ExternalDataConfig + // The query to use for a view. ViewQuery optional.String