Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): expose connections and schema autodetect modifier #5739

Merged
merged 18 commits into from May 3, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions bigquery/external.go
Expand Up @@ -104,6 +104,11 @@ type ExternalDataConfig struct {
//
// StringTargetType supports all precision and scale values.
DecimalTargetTypes []DecimalTargetType

// ConnectionID associates an external data configuration with a connection ID.
// Connections are managed through the BigQuery Connection API:
// https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1
ConnectionID string
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -115,6 +120,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
IgnoreUnknownValues: e.IgnoreUnknownValues,
MaxBadRecords: e.MaxBadRecords,
HivePartitioningOptions: e.HivePartitioningOptions.toBQ(),
ConnectionId: e.ConnectionID,
}
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
Expand All @@ -138,6 +144,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
MaxBadRecords: q.MaxBadRecords,
Schema: bqToSchema(q.Schema),
HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions),
ConnectionID: q.ConnectionId,
}
for _, v := range q.DecimalTargetTypes {
e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v))
Expand Down
1 change: 1 addition & 0 deletions bigquery/external_test.go
Expand Up @@ -41,6 +41,7 @@ func TestExternalDataConfig(t *testing.T) {
SkipLeadingRows: 3,
NullMarker: "marker",
},
ConnectionID: "connection",
},
{
SourceFormat: GoogleSheets,
Expand Down
80 changes: 80 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -2330,6 +2330,86 @@ func TestIntegration_IteratorSource(t *testing.T) {
}
}

func TestIntegration_ExternalAutodetect(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

testTable := dataset.Table(tableIDs.New())
t.Logf("testtable: %s", testTable.FullyQualifiedName())

origExtCfg := &ExternalDataConfig{
SourceFormat: Avro,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"},
}

err := testTable.Create(ctx, &TableMetadata{
ExternalDataConfig: origExtCfg,
})
if err != nil {
t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err)
}

origMeta, err := testTable.Metadata(ctx)
if err != nil {
t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err)
}

wantSchema := Schema{
{Name: "stringfield", Type: "STRING"},
{Name: "int64field", Type: "INTEGER"},
}
if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" {
t.Fatalf("orig schema, got=-, want=+\n%s", diff)
}

// Now, point at the new files, but don't signal autodetect.
newExtCfg := &ExternalDataConfig{
SourceFormat: Avro,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"},
}

newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{
ExternalDataConfig: newExtCfg,
}, origMeta.ETag)
if err != nil {
t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err)
}
if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" {
t.Fatalf("new schema, got=-, want=+\n%s", diff)
}

// Now, signal autodetect in another update.
// This should yield a new schema.
newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{ExternalDataConfig: newExtCfg}, newMeta.ETag, WithAutoDetectSchema(true))
if err != nil {
t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err)
}

t.Logf("uris: %v", newMeta2.ExternalDataConfig.SourceURIs)

wantSchema2 := Schema{
{Name: "stringfield", Type: "STRING"},
{Name: "int64field", Type: "INTEGER"},
{Name: "otherfield", Type: "INTEGER"},
}
if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" {
t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff)
}

id, _ := testTable.Identifier(StandardSQLID)
q := client.Query(fmt.Sprintf("SELECT * FROM %s", id))
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("query read: %v", err)
}
wantRows := [][]Value{
{"bar", int64(32), int64(314)},
}
checkReadAndTotalRows(t, "row check", it, wantRows)
}

func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
43 changes: 38 additions & 5 deletions bigquery/table.go
Expand Up @@ -713,23 +713,48 @@ func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
// NeverExpire is a sentinel value used to remove a table'e expiration time.
var NeverExpire = time.Time{}.Add(-1)

// We use this for the option pattern rather than exposing the underlying
// discovery type directly.
type tablePatchCall struct {
call *bq.TablesPatchCall
}

// TableUpdateOption allow requests to update table metadata.
type TableUpdateOption func(*tablePatchCall)

// WithAutoDetectSchema governs whether the schema autodetection occurs as part of the table update.
// This is relevant in cases like external tables where schema is detected from the source data.
func WithAutoDetectSchema(b bool) TableUpdateOption {
return func(tpc *tablePatchCall) {
tpc.call.AutodetectSchema(b)
}
}

// Update modifies specific Table metadata fields.
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (md *TableMetadata, err error) {
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string, opts ...TableUpdateOption) (md *TableMetadata, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Update")
defer func() { trace.EndSpan(ctx, err) }()

bqt, err := tm.toBQ()
if err != nil {
return nil, err
}
call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx)
setClientHeader(call.Header())

tpc := &tablePatchCall{
call: t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx),
}

for _, o := range opts {
o(tpc)
}

setClientHeader(tpc.call.Header())
if etag != "" {
call.Header().Set("If-Match", etag)
tpc.call.Header().Set("If-Match", etag)
}
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
res, err = call.Do()
res, err = tpc.call.Do()
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -762,6 +787,10 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
if tm.EncryptionConfig != nil {
t.EncryptionConfiguration = tm.EncryptionConfig.toBQ()
}
if tm.ExternalDataConfig != nil {
cfg := tm.ExternalDataConfig.toBQ()
t.ExternalDataConfiguration = &cfg
}

if tm.Clustering != nil {
t.Clustering = tm.Clustering.toBQ()
Expand Down Expand Up @@ -848,6 +877,10 @@ type TableMetadataToUpdate struct {
// set ExpirationTime to NeverExpire. The zero value is ignored.
ExpirationTime time.Time

// ExternalDataConfig controls the definition of a table defined against
// an external source, such as one based on files in Google Cloud Storage.
ExternalDataConfig *ExternalDataConfig

// The query to use for a view.
ViewQuery optional.String

Expand Down