From 6d0968722352895f2d8034e5c079bd406e3ca4b2 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Mon, 29 Nov 2021 20:45:26 +0000 Subject: [PATCH 1/7] feat(storage): retry copy funcs on idempotent conds (#5172) * retry objects.compose on idempotent conds * retry objects.rewrite on idempotent conds --- storage/copy.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/storage/copy.go b/storage/copy.go index 61983df5ada..f180753520c 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -138,8 +138,11 @@ func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.Rewr var res *raw.RewriteResponse var err error setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { res, err = call.Do(); return err }) - if err != nil { + + retryCall := func() error { res, err = call.Do(); return err } + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + + if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { return nil, err } c.RewriteToken = res.RewriteToken @@ -230,8 +233,11 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { } var obj *raw.Object setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) - if err != nil { + + retryCall := func() error { obj, err = call.Do(); return err } + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + + if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { return nil, err } return newObject(obj), nil From f58a9f7b88e2ce6101cb4bd3c85c267a688a1a1d Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 29 Nov 2021 23:15:54 -0800 Subject: [PATCH 2/7] chore(bigquery): release 1.25.0 (#5128) Release-As: 1.25.0 --- bigquery/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/README.md b/bigquery/README.md index 94f5c6d306a..ab52b629891 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -1,5 +1,5 @@ ## BigQuery [![Go Reference](https://pkg.go.dev/badge/cloud.google.com/go/bigquery.svg)](https://pkg.go.dev/cloud.google.com/go/bigquery) - + - [About BigQuery](https://cloud.google.com/bigquery/) - [API documentation](https://cloud.google.com/bigquery/docs) - [Go client documentation](https://pkg.go.dev/cloud.google.com/go/bigquery) From c9cd9846b6707d236648d33d44434e64eced9cdd Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 30 Nov 2021 16:11:07 -0800 Subject: [PATCH 3/7] feat(bigquery): expose identifiers using a variety of formats (#5017) * feat(bigquery): expose identifiers using a variety of formats This PR adds an Identifier() method to common BQ resources so that users can get an identifier that is formatted appropriately for their use case (legacy sql, standard sql, referencing in storage API, etc). Existing instances of FullyQualifiedName() have been migrated to the new method. Fixes: https://github.com/googleapis/google-cloud-go/issues/1955 --- bigquery/dataset.go | 20 ++++++++++++ bigquery/dataset_test.go | 60 +++++++++++++++++++++++++++++++++++ bigquery/integration_test.go | 38 ++++++++++++---------- bigquery/model.go | 25 ++++++++++++++- bigquery/model_test.go | 61 ++++++++++++++++++++++++++++++++++++ bigquery/routine.go | 20 +++++++++++- bigquery/routine_test.go | 60 +++++++++++++++++++++++++++++++++++ bigquery/table.go | 39 ++++++++++++++++++++++- bigquery/table_test.go | 61 ++++++++++++++++++++++++++++++++++++ 9 files changed, 365 insertions(+), 19 deletions(-) diff --git a/bigquery/dataset.go b/bigquery/dataset.go index 91b649eed8c..7f9115a5c2f 100644 --- a/bigquery/dataset.go +++ b/bigquery/dataset.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -88,6 +89,25 @@ func (c *Client) DatasetInProject(projectID, datasetID string) *Dataset { } } +// Identifier returns the ID of the dataset in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (d *Dataset) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s", d.ProjectID, d.DatasetID), nil + case StandardSQLID: + // Quote project identifiers if they have a dash character. + if strings.Contains(d.ProjectID, "-") { + return fmt.Sprintf("`%s`.%s", d.ProjectID, d.DatasetID), nil + } + return fmt.Sprintf("%s.%s", d.ProjectID, d.DatasetID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // Create creates a dataset in the BigQuery service. An error will be returned if the // dataset already exists. Pass in a DatasetMetadata value to configure the dataset. func (d *Dataset) Create(ctx context.Context, md *DatasetMetadata) (err error) { diff --git a/bigquery/dataset_test.go b/bigquery/dataset_test.go index 351947b73e9..fb4d3ea2ee2 100644 --- a/bigquery/dataset_test.go +++ b/bigquery/dataset_test.go @@ -476,3 +476,63 @@ func TestConvertAccessEntry(t *testing.T) { t.Error("got nil, want error") } } + +func TestDatasetIdentifiers(t *testing.T) { + testDataset := &Dataset{ + ProjectID: "p", + DatasetID: "d", + c: nil, + } + for _, tc := range []struct { + description string + in *Dataset + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testDataset, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testDataset, + format: LegacySQLID, + want: "p:d", + }, + { + description: "standard unquoted", + in: testDataset, + format: StandardSQLID, + want: "p.d", + }, + { + description: "standard w/quoting", + in: &Dataset{ProjectID: "p-p", DatasetID: "d"}, + format: StandardSQLID, + want: "`p-p`.d", + }, + { + description: "api resource", + in: testDataset, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 766c56639ae..bc8b96a137d 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -371,12 +371,12 @@ func TestIntegration_TableCreateView(t *testing.T) { } ctx := context.Background() table := newTable(t, schema) + tableIdentifier, _ := table.Identifier(StandardSQLID) defer table.Delete(ctx) // Test that standard SQL views work. view := dataset.Table("t_view_standardsql") - query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", - dataset.ProjectID, dataset.DatasetID, table.TableID) + query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM %s", tableIdentifier) err := view.Create(context.Background(), &TableMetadata{ ViewQuery: query, UseStandardSQL: true, @@ -936,10 +936,11 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) { // Create a sample UDF so we can verify adding authorized UDFs routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, - routine.FullyQualifiedName()) + CREATE FUNCTION %s(x INT64) AS (x * 3);`, + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -1348,13 +1349,14 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { // Define a simple stored procedure via DDL. routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE OR REPLACE PROCEDURE `+"`%s`"+`(val INT64) + CREATE OR REPLACE PROCEDURE %s(val INT64) BEGIN SELECT CURRENT_TIMESTAMP() as ts; SELECT val * 2 as f2; END`, - routine.FullyQualifiedName()) + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) @@ -1363,8 +1365,8 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { // Invoke the stored procedure. sql = fmt.Sprintf(` - CALL `+"`%s`"+`(5)`, - routine.FullyQualifiedName()) + CALL %s(5)`, + routineSQLID) q := client.Query(sql) it, err := q.Read(ctx) @@ -2354,8 +2356,10 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { } defer customTable.Delete(ctx) + customTableSQLID, _ := customTable.Identifier(StandardSQLID) + // Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected. - sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID) + sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM %s WHERE pkey=\"foo\"", customTableSQLID) q := client.Query(sql) it, err := q.Read(ctx) if err != nil { @@ -3227,10 +3231,10 @@ func TestIntegration_ModelLifecycle(t *testing.T) { // Create a model via a CREATE MODEL query modelID := modelIDs.New() model := dataset.Model(modelID) - modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID) + modelSQLID, _ := model.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE MODEL `+"`%s`"+` + CREATE MODEL %s OPTIONS ( model_type='linear_reg', max_iteration=1, @@ -3240,7 +3244,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) { SELECT 'a' AS f1, 2.0 AS label UNION ALL SELECT 'b' AS f1, 3.8 AS label - )`, modelRef) + )`, modelSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -3417,13 +3421,14 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) { routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`("+` + CREATE FUNCTION %s( arr ARRAY> ) AS ( (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) )`, - routine.FullyQualifiedName()) + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -3480,10 +3485,11 @@ func TestIntegration_RoutineLifecycle(t *testing.T) { // Create a scalar UDF routine via a CREATE FUNCTION query routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, - routine.FullyQualifiedName()) + CREATE FUNCTION %s(x INT64) AS (x * 3);`, + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } diff --git a/bigquery/model.go b/bigquery/model.go index 363133bf66d..56260efe9c3 100644 --- a/bigquery/model.go +++ b/bigquery/model.go @@ -17,6 +17,7 @@ package bigquery import ( "context" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -41,9 +42,31 @@ type Model struct { c *Client } +// Identifier returns the ID of the model in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (m *Model) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s.%s", m.ProjectID, m.DatasetID, m.ModelID), nil + case StandardSQLID: + // Per https://cloud.google.com/bigquery-ml/docs/reference/standard-sql/bigqueryml-syntax-create#model_name + // we quote the entire identifier. + out := fmt.Sprintf("%s.%s.%s", m.ProjectID, m.DatasetID, m.ModelID) + if strings.Contains(out, "-") { + out = fmt.Sprintf("`%s`", out) + } + return out, nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns the ID of the model in projectID:datasetID.modelid format. func (m *Model) FullyQualifiedName() string { - return fmt.Sprintf("%s:%s.%s", m.ProjectID, m.DatasetID, m.ModelID) + s, _ := m.Identifier(LegacySQLID) + return s } func (m *Model) toBQ() *bq.ModelReference { diff --git a/bigquery/model_test.go b/bigquery/model_test.go index 98b38c864e5..d46bd296312 100644 --- a/bigquery/model_test.go +++ b/bigquery/model_test.go @@ -120,3 +120,64 @@ func TestModelMetadataUpdateToBQ(t *testing.T) { } } } + +func TestModelIdentifiers(t *testing.T) { + testModel := &Model{ + ProjectID: "p", + DatasetID: "d", + ModelID: "m", + c: nil, + } + for _, tc := range []struct { + description string + in *Model + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testModel, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testModel, + format: LegacySQLID, + want: "p:d.m", + }, + { + description: "standard unquoted", + in: testModel, + format: StandardSQLID, + want: "p.d.m", + }, + { + description: "standard w/dash", + in: &Model{ProjectID: "p-p", DatasetID: "d", ModelID: "m"}, + format: StandardSQLID, + want: "`p-p.d.m`", + }, + { + description: "api resource", + in: testModel, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/routine.go b/bigquery/routine.go index 46c8ca398f9..20011cf3a17 100644 --- a/bigquery/routine.go +++ b/bigquery/routine.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -44,9 +45,26 @@ func (r *Routine) toBQ() *bq.RoutineReference { } } +// Identifier returns the ID of the routine in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (r *Routine) Identifier(f IdentifierFormat) (string, error) { + switch f { + case StandardSQLID: + if strings.Contains(r.ProjectID, "-") { + return fmt.Sprintf("`%s`.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil + } + return fmt.Sprintf("%s.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns an identifer for the routine in project.dataset.routine format. func (r *Routine) FullyQualifiedName() string { - return fmt.Sprintf("%s.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID) + s, _ := r.Identifier(StandardSQLID) + return s } // Create creates a Routine in the BigQuery service. diff --git a/bigquery/routine_test.go b/bigquery/routine_test.go index 697713c86da..59210ff9a01 100644 --- a/bigquery/routine_test.go +++ b/bigquery/routine_test.go @@ -167,3 +167,63 @@ func TestRoutineTypeConversions(t *testing.T) { }) } } + +func TestRoutineIdentifiers(t *testing.T) { + testRoutine := &Routine{ + ProjectID: "p", + DatasetID: "d", + RoutineID: "r", + c: nil, + } + for _, tc := range []struct { + description string + in *Routine + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testRoutine, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testRoutine, + wantErr: true, + }, + { + description: "standard unquoted", + in: testRoutine, + format: StandardSQLID, + want: "p.d.r", + }, + { + description: "standard w/dash", + in: &Routine{ProjectID: "p-p", DatasetID: "d", RoutineID: "r"}, + format: StandardSQLID, + want: "`p-p`.d.r", + }, + { + description: "api resource", + in: testRoutine, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/table.go b/bigquery/table.go index 202b2f737f2..764e6b27a7c 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -471,9 +471,46 @@ func (t *Table) toBQ() *bq.TableReference { } } +// IdentifierFormat represents a how certain resource identifiers such as table references +// are formatted. +type IdentifierFormat string + +var ( + // StandardSQLID returns an identifier suitable for use with Standard SQL. + StandardSQLID IdentifierFormat = "SQL" + + // LegacySQLID returns an identifier suitable for use with Legacy SQL. + LegacySQLID IdentifierFormat = "LEGACY_SQL" + + // StorageAPIResourceID returns an identifier suitable for use with the Storage API. Namely, it's for formatting + // a table resource for invoking read and write functionality. + StorageAPIResourceID IdentifierFormat = "STORAGE_API_RESOURCE" + + // ErrUnknownIdentifierFormat is indicative of requesting an identifier in a format that is + // not supported. + ErrUnknownIdentifierFormat = errors.New("unknown identifier format") +) + +// Identifier returns the ID of the table in the requested format. +func (t *Table) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID), nil + case StorageAPIResourceID: + return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", t.ProjectID, t.DatasetID, t.TableID), nil + case StandardSQLID: + // Note we don't need to quote the project ID here, as StandardSQL has special rules to allow + // dash identifiers for projects without issue in table identifiers. + return fmt.Sprintf("%s.%s.%s", t.ProjectID, t.DatasetID, t.TableID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns the ID of the table in projectID:datasetID.tableID format. func (t *Table) FullyQualifiedName() string { - return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID) + s, _ := t.Identifier(LegacySQLID) + return s } // implicitTable reports whether Table is an empty placeholder, which signifies that a new table should be created with an auto-generated Table ID. diff --git a/bigquery/table_test.go b/bigquery/table_test.go index bc0d47e66ad..d56c71c2bc1 100644 --- a/bigquery/table_test.go +++ b/bigquery/table_test.go @@ -441,3 +441,64 @@ func TestTableMetadataToUpdateToBQErrors(t *testing.T) { } } } + +func TestTableIdentifiers(t *testing.T) { + testTable := &Table{ + ProjectID: "p", + DatasetID: "d", + TableID: "t", + c: nil, + } + for _, tc := range []struct { + description string + in *Table + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testTable, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testTable, + format: LegacySQLID, + want: "p:d.t", + }, + { + description: "standard unquoted", + in: testTable, + format: StandardSQLID, + want: "p.d.t", + }, + { + description: "standard w/dash", + in: &Table{ProjectID: "p-p", DatasetID: "d", TableID: "t"}, + format: StandardSQLID, + want: "p-p.d.t", + }, + { + description: "api resource", + in: testTable, + format: StorageAPIResourceID, + want: "projects/p/datasets/d/tables/t", + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} From 014b314b2db70147a26120a1d54a6bc7142d5665 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 1 Dec 2021 09:52:08 -0800 Subject: [PATCH 4/7] feat(bigquery/storage/managedwriter): support variadic appends (#5102) * feat(bigquery/storage/managedwriter): support variadic AppendOption --- .../storage/managedwriter/appendresult.go | 30 ++- .../managedwriter/appendresult_test.go | 60 ++--- .../storage/managedwriter/integration_test.go | 108 ++++++++- .../storage/managedwriter/managed_stream.go | 38 +++- .../managedwriter/managed_stream_test.go | 10 +- .../testdata/messages_proto2.pb.go | 210 ++++++++++++------ .../testdata/messages_proto2.proto | 10 + .../storage/managedwriter/testdata/schemas.go | 6 + .../storage/managedwriter/testutils_test.go | 12 +- 9 files changed, 362 insertions(+), 122 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 020d0de741a..8a9d3323976 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -19,6 +19,7 @@ import ( storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { // append request. type pendingWrite struct { request *storagepb.AppendRowsRequest - result *AppendResult + // for schema evolution cases, accept a new schema + newSchema *descriptorpb.DescriptorProto + result *AppendResult // this is used by the flow controller. reqSize int @@ -77,7 +80,7 @@ type pendingWrite struct { // that in the future, we may want to allow row batching to be managed by // the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING // streams, this should be managed by the user. -func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { +func newPendingWrite(appends [][]byte) *pendingWrite { pw := &pendingWrite{ request: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -90,9 +93,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { }, result: newAppendResult(appends), } - if offset > 0 { - pw.request.Offset = &wrapperspb.Int64Value{Value: offset} - } // We compute the size now for flow controller purposes, though // the actual request size may be slightly larger (e.g. the first // request in a new stream bears schema and stream id). @@ -114,3 +114,23 @@ func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowControlle fc.release(pw.reqSize) } } + +// AppendOption are options that can be passed when appending data with a managed stream instance. +type AppendOption func(*pendingWrite) + +// UpdateSchemaDescriptor is used to update the descriptor message schema associated +// with a given stream. +func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { + return func(pw *pendingWrite) { + pw.newSchema = schema + } +} + +// WithOffset sets an explicit offset value for this append request. +func WithOffset(offset int64) AppendOption { + return func(pw *pendingWrite) { + pw.request.Offset = &wrapperspb.Int64Value{ + Value: offset, + } + } +} diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index e30a53fb6d2..d760f5b8e97 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -43,38 +43,33 @@ func TestPendingWrite(t *testing.T) { []byte("row3"), } - var wantOffset int64 = 99 - - // first, verify no offset behavior - pending := newPendingWrite(wantRowData, NoStreamOffset) + // verify no offset behavior + pending := newPendingWrite(wantRowData) if pending.request.GetOffset() != nil { t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) } - pending.markDone(NoStreamOffset, nil, nil) - if pending.result.offset != NoStreamOffset { - t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset) - } - if pending.result.err != nil { - t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) - } - - // now, verify behavior with a valid offset - pending = newPendingWrite(wantRowData, 99) - if pending.request.GetOffset() == nil { - t.Errorf("offset not set, should be %d", wantOffset) - } - if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset { - t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset) - } - // check request shape gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows()) if gotRowCount != len(wantRowData) { t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData)) } - // verify AppendResult + // Verify request is not acknowledged. + select { + case <-pending.result.Ready(): + t.Errorf("got Ready() on incomplete AppendResult") + case <-time.After(100 * time.Millisecond): + + } + // Mark completed, verify result. + pending.markDone(NoStreamOffset, nil, nil) + if pending.result.offset != NoStreamOffset { + t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset) + } + if pending.result.err != nil { + t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) + } gotData := pending.result.rowData if len(gotData) != len(wantRowData) { t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData)) @@ -84,15 +79,24 @@ func TestPendingWrite(t *testing.T) { t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i]) } } - select { - case <-pending.result.Ready(): - t.Errorf("got Ready() on incomplete AppendResult") - case <-time.After(100 * time.Millisecond): - } + // Create new write to verify error result. + pending = newPendingWrite(wantRowData) - // verify completion behavior + // Manually invoke option to apply offset to request. + // This would normally be appied as part of the AppendRows() method on the managed stream. reportedOffset := int64(101) + f := WithOffset(reportedOffset) + f(pending) + + if pending.request.GetOffset() == nil { + t.Errorf("expected offset, got none") + } + if pending.request.GetOffset().GetValue() != reportedOffset { + t.Errorf("offset mismatch, got %d wanted %d", pending.request.GetOffset().GetValue(), reportedOffset) + } + + // Verify completion behavior with an error. wantErr := fmt.Errorf("foo") pending.markDone(reportedOffset, wantErr, nil) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index d0ebca0f5de..7573c23a4fc 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -144,6 +144,10 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Parallel() testPendingStream(ctx, t, mwClient, bqClient, dataset) }) + t.Run("SchemaEvolution", func(t *testing.T) { + t.Parallel() + testSchemaEvolution(ctx, t, mwClient, bqClient, dataset) + }) t.Run("Instrumentation", func(t *testing.T) { // Don't run this in parallel, we only want to collect stats from this subtest. testInstrumentation(ctx, t, mwClient, bqClient, dataset) @@ -181,7 +185,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -201,7 +205,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } data = append(data, b) } - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("grouped-row append failed: %v", err) } @@ -256,7 +260,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C if err != nil { t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) } - result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -305,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err := ms.AppendRows(ctx, data, NoStreamOffset) + results, err := ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -358,7 +362,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -397,12 +401,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } + // be explicit about waiting/checking each response. + off, err := result.GetResult(ctx) + if err != nil { + t.Errorf("response %d error: %v", k, err) + } + if off != int64(k) { + t.Errorf("offset mismatch, got %d want %d", off, k) + } } - result.Ready() wantRows := int64(len(testSimpleData)) // Mark stream complete. @@ -468,7 +479,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -513,6 +524,85 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq } } +func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + var result *AppendResult + for k, mesg := range testSimpleData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + result, err = ms.AppendRows(ctx, data) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after send", + withExactRowCount(int64(len(testSimpleData)))) + + // Now, evolve the underlying table schema. + _, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "") + if err != nil { + t.Errorf("failed to evolve table schema: %v", err) + } + + // TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change. + // In the continuous case, we'd get it from the response, but the change-and-wait case needs something more. + time.Sleep(6 * time.Second) + + // ready descriptor, send an additional append + m2 := &testdata.SimpleMessageEvolvedProto2{ + Name: proto.String("evolved"), + Value: proto.Int64(180), + Other: proto.String("hello evolution"), + } + descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor()) + b, err := proto.Marshal(m2) + if err != nil { + t.Errorf("failed to marshal evolved message: %v", err) + } + result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto)) + if err != nil { + t.Errorf("failed evolved append: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on evolved append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after send", + withExactRowCount(int64(len(testSimpleData)+1)), + withNullCount("name", 0), + withNonNullCount("other", 1), + ) +} + func TestIntegration_DetectProjectID(t *testing.T) { ctx := context.Background() testCreds := testutil.Credentials(ctx) @@ -613,7 +703,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, if err != nil { t.Fatalf("NewManagedStream: %v", err) } - result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset) + result, err := ms.AppendRows(ctx, [][]byte{sampleRow}) if err != nil { t.Errorf("append failed: %v", err) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 2a419652e6c..7e26d53d2f8 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -178,7 +178,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( // getStream returns either a valid ARC client stream or permanent error. // // Calling getStream locks the mutex. -func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { +func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.err != nil { @@ -190,9 +190,16 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient } // Always return the retained ARC if the arg differs. - if arc != ms.arc { + if arc != ms.arc && !forceReconnect { return ms.arc, ms.pending, nil } + if arc != ms.arc && forceReconnect && ms.arc != nil { + // In this case, we're forcing a close to apply changes to the stream + // that currently can't be modified on an established connection. + // + // TODO: clean this up once internal issue 205756033 is resolved. + (*ms.arc).CloseSend() + } ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) *ms.arc, ms.pending, ms.err = ms.openWithRetry() @@ -248,7 +255,7 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error var err error for { - arc, ch, err = ms.getStream(arc) + arc, ch, err = ms.getStream(arc, pw.newSchema != nil) if err != nil { return err } @@ -304,7 +311,7 @@ func (ms *ManagedStream) Close() error { var arc *storagepb.BigQueryWrite_AppendRowsClient - arc, ch, err := ms.getStream(arc) + arc, ch, err := ms.getStream(arc, false) if err != nil { return err } @@ -328,15 +335,30 @@ func (ms *ManagedStream) Close() error { // AppendRows sends the append requests to the service, and returns a single AppendResult for tracking // the set of data. // -// The format of the row data is binary serialized protocol buffer bytes, and and the message -// must adhere to the format of the schema Descriptor passed in when creating the managed stream. -func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) (*AppendResult, error) { - pw := newPendingWrite(data, offset) +// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible +// with the schema currently set for the stream. +// +// Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for +// a default stream is unsupported. +func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) { + pw := newPendingWrite(data) + // apply AppendOption opts + for _, opt := range opts { + opt(pw) + } // check flow control if err := ms.fc.acquire(ctx, pw.reqSize); err != nil { // in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release. pw.markDone(NoStreamOffset, err, nil) } + // if we've received an updated schema as part of a write, propagate it to both the cached schema and + // populate the schema in the request. + if pw.newSchema != nil { + ms.schemaDescriptor = pw.newSchema + pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + ProtoDescriptor: pw.newSchema, + } + } // proceed to call if err := ms.append(pw); err != nil { // pending write is DOA. diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 94d087c44ba..06a385e402c 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -127,7 +127,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { wantReqs := 3 for i := 0; i < wantReqs; i++ { - _, err := ms.AppendRows(ctx, fakeData, NoStreamOffset) + _, err := ms.AppendRows(ctx, fakeData, WithOffset(int64(i))) if err != nil { t.Errorf("AppendRows; %v", err) } @@ -145,6 +145,14 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { if v == nil { t.Errorf("request %d was nil", k) } + if v.GetOffset() == nil { + t.Errorf("request %d had no offset", k) + } else { + gotOffset := v.GetOffset().GetValue() + if gotOffset != int64(k) { + t.Errorf("request %d wanted offset %d, got %d", k, k, gotOffset) + } + } if k == 0 { if v.GetTraceId() == "" { t.Errorf("expected TraceId on first request, was empty") diff --git a/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go b/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go index d57ac20be31..b287799d885 100644 --- a/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go +++ b/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0 -// protoc v3.10.1 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: messages_proto2.proto package testdata @@ -24,7 +24,6 @@ import ( reflect "reflect" sync "sync" - proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) @@ -36,10 +35,6 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// This is a compile-time assertion that a sufficiently up-to-date version -// of the legacy proto package is being used. -const _ = proto.ProtoPackageIsVersion4 - // SimpleMessage represents a simple message that transmits a string and int64 value. type SimpleMessageProto2 struct { state protoimpl.MessageState @@ -98,6 +93,72 @@ func (x *SimpleMessageProto2) GetValue() int64 { return 0 } +type SimpleMessageEvolvedProto2 struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // name is a simple scalar string. + Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + // value is a simple int64 value. + Value *int64 `protobuf:"varint,2,opt,name=value" json:"value,omitempty"` + // other is an additional string. + Other *string `protobuf:"bytes,3,opt,name=other" json:"other,omitempty"` +} + +func (x *SimpleMessageEvolvedProto2) Reset() { + *x = SimpleMessageEvolvedProto2{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto2_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SimpleMessageEvolvedProto2) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SimpleMessageEvolvedProto2) ProtoMessage() {} + +func (x *SimpleMessageEvolvedProto2) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto2_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SimpleMessageEvolvedProto2.ProtoReflect.Descriptor instead. +func (*SimpleMessageEvolvedProto2) Descriptor() ([]byte, []int) { + return file_messages_proto2_proto_rawDescGZIP(), []int{1} +} + +func (x *SimpleMessageEvolvedProto2) GetName() string { + if x != nil && x.Name != nil { + return *x.Name + } + return "" +} + +func (x *SimpleMessageEvolvedProto2) GetValue() int64 { + if x != nil && x.Value != nil { + return *x.Value + } + return 0 +} + +func (x *SimpleMessageEvolvedProto2) GetOther() string { + if x != nil && x.Other != nil { + return *x.Other + } + return "" +} + type GithubArchiveEntityProto2 struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -113,7 +174,7 @@ type GithubArchiveEntityProto2 struct { func (x *GithubArchiveEntityProto2) Reset() { *x = GithubArchiveEntityProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[1] + mi := &file_messages_proto2_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -126,7 +187,7 @@ func (x *GithubArchiveEntityProto2) String() string { func (*GithubArchiveEntityProto2) ProtoMessage() {} func (x *GithubArchiveEntityProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[1] + mi := &file_messages_proto2_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -139,7 +200,7 @@ func (x *GithubArchiveEntityProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveEntityProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveEntityProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{1} + return file_messages_proto2_proto_rawDescGZIP(), []int{2} } func (x *GithubArchiveEntityProto2) GetId() int64 { @@ -190,7 +251,7 @@ type GithubArchiveRepoProto2 struct { func (x *GithubArchiveRepoProto2) Reset() { *x = GithubArchiveRepoProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[2] + mi := &file_messages_proto2_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -203,7 +264,7 @@ func (x *GithubArchiveRepoProto2) String() string { func (*GithubArchiveRepoProto2) ProtoMessage() {} func (x *GithubArchiveRepoProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[2] + mi := &file_messages_proto2_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -216,7 +277,7 @@ func (x *GithubArchiveRepoProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveRepoProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveRepoProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{2} + return file_messages_proto2_proto_rawDescGZIP(), []int{3} } func (x *GithubArchiveRepoProto2) GetId() int64 { @@ -260,7 +321,7 @@ type GithubArchiveMessageProto2 struct { func (x *GithubArchiveMessageProto2) Reset() { *x = GithubArchiveMessageProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[3] + mi := &file_messages_proto2_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -273,7 +334,7 @@ func (x *GithubArchiveMessageProto2) String() string { func (*GithubArchiveMessageProto2) ProtoMessage() {} func (x *GithubArchiveMessageProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[3] + mi := &file_messages_proto2_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -286,7 +347,7 @@ func (x *GithubArchiveMessageProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveMessageProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveMessageProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{3} + return file_messages_proto2_proto_rawDescGZIP(), []int{4} } func (x *GithubArchiveMessageProto2) GetType() string { @@ -361,46 +422,52 @@ var file_messages_proto2_proto_rawDesc = []byte{ 0x67, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x22, 0x93, 0x01, 0x0a, 0x19, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, + 0x75, 0x65, 0x22, 0x5c, 0x0a, 0x1a, 0x53, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x45, 0x76, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, + 0x68, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, + 0x22, 0x93, 0x01, 0x0a, 0x19, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, + 0x0a, 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6c, + 0x6f, 0x67, 0x69, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x72, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x72, 0x61, 0x76, 0x61, + 0x74, 0x61, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x5f, + 0x75, 0x72, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x76, 0x61, 0x74, 0x61, + 0x72, 0x55, 0x72, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x4f, 0x0a, 0x17, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x32, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0xd0, 0x02, 0x0a, 0x1a, 0x47, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x75, + 0x62, 0x6c, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x70, 0x75, 0x62, 0x6c, + 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, 0x0a, 0x04, + 0x72, 0x65, 0x70, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x73, + 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, + 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x04, 0x72, + 0x65, 0x70, 0x6f, 0x12, 0x39, 0x0a, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x35, + 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, + 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x72, 0x61, 0x76, 0x61, 0x74, - 0x61, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x72, 0x61, - 0x76, 0x61, 0x74, 0x61, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x76, 0x61, 0x74, 0x61, - 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x76, 0x61, - 0x74, 0x61, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x4f, 0x0a, 0x17, 0x47, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x32, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0xd0, 0x02, 0x0a, 0x1a, 0x47, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, - 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x70, 0x75, - 0x62, 0x6c, 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, - 0x0a, 0x04, 0x72, 0x65, 0x70, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, - 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, - 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, - 0x04, 0x72, 0x65, 0x70, 0x6f, 0x12, 0x39, 0x0a, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, - 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, - 0x12, 0x35, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, - 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, - 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x32, 0x52, 0x03, 0x6f, 0x72, 0x67, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x18, - 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x42, 0x3d, 0x5a, 0x3b, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, - 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, + 0x52, 0x03, 0x6f, 0x72, 0x67, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x64, 0x41, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x18, 0x09, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, + 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, } var ( @@ -415,17 +482,18 @@ func file_messages_proto2_proto_rawDescGZIP() []byte { return file_messages_proto2_proto_rawDescData } -var file_messages_proto2_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_messages_proto2_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_messages_proto2_proto_goTypes = []interface{}{ (*SimpleMessageProto2)(nil), // 0: testdata.SimpleMessageProto2 - (*GithubArchiveEntityProto2)(nil), // 1: testdata.GithubArchiveEntityProto2 - (*GithubArchiveRepoProto2)(nil), // 2: testdata.GithubArchiveRepoProto2 - (*GithubArchiveMessageProto2)(nil), // 3: testdata.GithubArchiveMessageProto2 + (*SimpleMessageEvolvedProto2)(nil), // 1: testdata.SimpleMessageEvolvedProto2 + (*GithubArchiveEntityProto2)(nil), // 2: testdata.GithubArchiveEntityProto2 + (*GithubArchiveRepoProto2)(nil), // 3: testdata.GithubArchiveRepoProto2 + (*GithubArchiveMessageProto2)(nil), // 4: testdata.GithubArchiveMessageProto2 } var file_messages_proto2_proto_depIdxs = []int32{ - 2, // 0: testdata.GithubArchiveMessageProto2.repo:type_name -> testdata.GithubArchiveRepoProto2 - 1, // 1: testdata.GithubArchiveMessageProto2.actor:type_name -> testdata.GithubArchiveEntityProto2 - 1, // 2: testdata.GithubArchiveMessageProto2.org:type_name -> testdata.GithubArchiveEntityProto2 + 3, // 0: testdata.GithubArchiveMessageProto2.repo:type_name -> testdata.GithubArchiveRepoProto2 + 2, // 1: testdata.GithubArchiveMessageProto2.actor:type_name -> testdata.GithubArchiveEntityProto2 + 2, // 2: testdata.GithubArchiveMessageProto2.org:type_name -> testdata.GithubArchiveEntityProto2 3, // [3:3] is the sub-list for method output_type 3, // [3:3] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name @@ -452,7 +520,7 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GithubArchiveEntityProto2); i { + switch v := v.(*SimpleMessageEvolvedProto2); i { case 0: return &v.state case 1: @@ -464,7 +532,7 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GithubArchiveRepoProto2); i { + switch v := v.(*GithubArchiveEntityProto2); i { case 0: return &v.state case 1: @@ -476,6 +544,18 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GithubArchiveRepoProto2); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto2_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GithubArchiveMessageProto2); i { case 0: return &v.state @@ -494,7 +574,7 @@ func file_messages_proto2_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_messages_proto2_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/bigquery/storage/managedwriter/testdata/messages_proto2.proto b/bigquery/storage/managedwriter/testdata/messages_proto2.proto index fdb2fb55e47..3881fdf518e 100644 --- a/bigquery/storage/managedwriter/testdata/messages_proto2.proto +++ b/bigquery/storage/managedwriter/testdata/messages_proto2.proto @@ -21,11 +21,21 @@ option go_package = "cloud.google.com/go/bigquery/storage/managedwriter/testdata message SimpleMessageProto2 { // name is a simple scalar string. optional string name = 1; + // value is a simple int64 value. optional int64 value = 2; } +message SimpleMessageEvolvedProto2 { + // name is a simple scalar string. + optional string name = 1; + // value is a simple int64 value. + optional int64 value = 2; + + // other is an additional string. + optional string other = 3; +} message GithubArchiveEntityProto2 { optional int64 id = 1; diff --git a/bigquery/storage/managedwriter/testdata/schemas.go b/bigquery/storage/managedwriter/testdata/schemas.go index 74f230414ec..5ddda986a6b 100644 --- a/bigquery/storage/managedwriter/testdata/schemas.go +++ b/bigquery/storage/managedwriter/testdata/schemas.go @@ -22,6 +22,12 @@ var ( {Name: "value", Type: bigquery.IntegerFieldType}, } + SimpleMessageEvolvedSchema bigquery.Schema = bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType}, + {Name: "other", Type: bigquery.StringFieldType}, + } + GithubArchiveSchema bigquery.Schema = bigquery.Schema{ {Name: "type", Type: bigquery.StringFieldType}, {Name: "public", Type: bigquery.BooleanFieldType}, diff --git a/bigquery/storage/managedwriter/testutils_test.go b/bigquery/storage/managedwriter/testutils_test.go index 439d958d05e..e6131b73055 100644 --- a/bigquery/storage/managedwriter/testutils_test.go +++ b/bigquery/storage/managedwriter/testutils_test.go @@ -126,23 +126,23 @@ func withExactRowCount(totalRows int64) constraintOption { } // withNullCount asserts the number of null values in a column. -func withNullCount(colname string, nullcount int64) constraintOption { +func withNullCount(colname string, nullCount int64) constraintOption { return func(vi *validationInfo) { resultCol := fmt.Sprintf("nullcol_count_%s", colname) vi.constraints[resultCol] = &constraint{ - projection: fmt.Sprintf("COUNTIF(ISNULL(%s)) AS %s", colname, resultCol), - expectedValue: nullcount, + projection: fmt.Sprintf("SUM(IF(%s IS NULL,1,0)) AS %s", colname, resultCol), + expectedValue: nullCount, } } } // withNonNullCount asserts the number of non null values in a column. -func withNonNullCount(colname string, nullcount int64) constraintOption { +func withNonNullCount(colname string, nonNullCount int64) constraintOption { return func(vi *validationInfo) { resultCol := fmt.Sprintf("nonnullcol_count_%s", colname) vi.constraints[resultCol] = &constraint{ - projection: fmt.Sprintf("COUNTIF(NOT ISNULL(%s)) AS %s", colname, resultCol), - expectedValue: nullcount, + projection: fmt.Sprintf("SUM(IF(%s IS NOT NULL,1,0)) AS %s", colname, resultCol), + expectedValue: nonNullCount, } } } From 46489f4c8a634068a3e7cf2fd5e5ca11b555c0a8 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Wed, 1 Dec 2021 21:51:46 +0000 Subject: [PATCH 5/7] feat(storage): GenerateSignedPostPolicyV4 can use existing creds to authenticate (#5105) --- storage/bucket.go | 50 +++++++++++++++++++++++- storage/integration_test.go | 78 +++++++++++++++++++++++++++++++++++-- storage/post_policy_v4.go | 15 +++++++ 3 files changed, 138 insertions(+), 5 deletions(-) diff --git a/storage/bucket.go b/storage/bucket.go index ec7dcb5c322..93221c5a75c 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -282,8 +282,54 @@ func (b *BucketHandle) SignedURL(object string, opts *SignedURLOptions) (string, return SignedURL(b.name, object, newopts) } -// TODO: Add a similar wrapper for GenerateSignedPostPolicyV4 allowing users to -// omit PrivateKey/SignBytes +// GenerateSignedPostPolicyV4 generates a PostPolicyV4 value from bucket, object and opts. +// The generated URL and fields will then allow an unauthenticated client to perform multipart uploads. +// +// This method only requires the Expires field in the specified PostPolicyV4Options +// to be non-nil. If not provided, it attempts to fill the GoogleAccessID and PrivateKey +// from the GOOGLE_APPLICATION_CREDENTIALS environment variable. +// If you are authenticating with a custom HTTP client, Service Account based +// auto-detection will be hindered. +// +// If no private key is found, it attempts to use the GoogleAccessID to sign the URL. +// This requires the IAM Service Account Credentials API to be enabled +// (https://console.developers.google.com/apis/api/iamcredentials.googleapis.com/overview) +// and iam.serviceAccounts.signBlob permissions on the GoogleAccessID service account. +// If you do not want these fields set for you, you may pass them in through opts or use +// GenerateSignedPostPolicyV4(bucket, name string, opts *PostPolicyV4Options) instead. +func (b *BucketHandle) GenerateSignedPostPolicyV4(object string, opts *PostPolicyV4Options) (*PostPolicyV4, error) { + if opts.GoogleAccessID != "" && (opts.SignRawBytes != nil || opts.SignBytes != nil || len(opts.PrivateKey) > 0) { + return GenerateSignedPostPolicyV4(b.name, object, opts) + } + // Make a copy of opts so we don't modify the pointer parameter. + newopts := opts.clone() + + if newopts.GoogleAccessID == "" { + id, err := b.detectDefaultGoogleAccessID() + if err != nil { + return nil, err + } + newopts.GoogleAccessID = id + } + if newopts.SignBytes == nil && newopts.SignRawBytes == nil && len(newopts.PrivateKey) == 0 { + if b.c.creds != nil && len(b.c.creds.JSON) > 0 { + var sa struct { + PrivateKey string `json:"private_key"` + } + err := json.Unmarshal(b.c.creds.JSON, &sa) + if err == nil && sa.PrivateKey != "" { + newopts.PrivateKey = []byte(sa.PrivateKey) + } + } + + // Don't error out if we can't unmarshal the private key from the client, + // fallback to the default sign function for the service account. + if len(newopts.PrivateKey) == 0 { + newopts.SignRawBytes = b.defaultSignBytesFunc(newopts.GoogleAccessID) + } + } + return GenerateSignedPostPolicyV4(b.name, object, newopts) +} func (b *BucketHandle) detectDefaultGoogleAccessID() (string, error) { returnErr := errors.New("no credentials found on client and not on GCE (Google Compute Engine)") diff --git a/storage/integration_test.go b/storage/integration_test.go index 6d3567d0c2a..b0ebf509643 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -4142,7 +4142,7 @@ func TestIntegration_PostPolicyV4(t *testing.T) { object := b.Object(objectName) defer h.mustDeleteObject(object) - pv4, err := GenerateSignedPostPolicyV4(newBucketName, objectName, opts) + pv4, err := b.GenerateSignedPostPolicyV4(objectName, opts) if err != nil { t.Fatal(err) } @@ -4248,6 +4248,78 @@ func TestIntegration_SignedURL_Bucket(t *testing.T) { } } +func TestIntegration_PostPolicyV4_Bucket(t *testing.T) { + h := testHelper{t} + ctx := context.Background() + + if testing.Short() && !replaying { + t.Skip("Integration tests skipped in short mode") + } + + // We explictly send the key to the client to sign with the private key + clientWithCredentials := newTestClientWithExplicitCredentials(ctx, t) + defer clientWithCredentials.Close() + + // Create another client to test the sign byte function as well + clientWithoutPrivateKey := testConfig(ctx, t, ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform") + defer clientWithoutPrivateKey.Close() + + jwt, err := testutil.JWTConfig() + if err != nil { + t.Fatalf("unable to find test credentials: %v", err) + } + + statusCodeToRespond := 200 + + for _, test := range []struct { + desc string + opts PostPolicyV4Options + client *Client + }{ + { + desc: "signing with the private key", + opts: PostPolicyV4Options{ + Expires: time.Now().Add(30 * time.Minute), + + Fields: &PolicyV4Fields{ + StatusCodeOnSuccess: statusCodeToRespond, + ContentType: "text/plain", + ACL: "public-read", + }, + }, + client: clientWithCredentials, + }, + { + desc: "signing with the default sign bytes func", + opts: PostPolicyV4Options{ + Expires: time.Now().Add(30 * time.Minute), + GoogleAccessID: jwt.Email, + Fields: &PolicyV4Fields{ + StatusCodeOnSuccess: statusCodeToRespond, + ContentType: "text/plain", + ACL: "public-read", + }, + }, + client: clientWithoutPrivateKey, + }, + } { + t.Run(test.desc, func(t *testing.T) { + objectName := uidSpace.New() + object := test.client.Bucket(bucketName).Object(objectName) + defer h.mustDeleteObject(object) + + pv4, err := test.client.Bucket(bucketName).GenerateSignedPostPolicyV4(objectName, &test.opts) + if err != nil { + t.Fatal(err) + } + + if err := verifyPostPolicy(pv4, object, bytes.Repeat([]byte("a"), 25), statusCodeToRespond); err != nil { + t.Fatal(err) + } + }) + } +} + // Tests that the same SignBytes function works for both // SignRawBytes on GeneratePostPolicyV4 and SignBytes on SignedURL func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) { @@ -4263,7 +4335,7 @@ func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) { h := testHelper{t} projectID := testutil.ProjID() bucketName := uidSpace.New() - objectName := "my-object.txt" + objectName := uidSpace.New() fileBody := bytes.Repeat([]byte("b"), 25) bucket := client.Bucket(bucketName) @@ -4491,7 +4563,7 @@ func (h testHelper) mustObjectAttrs(o *ObjectHandle) *ObjectAttrs { func (h testHelper) mustDeleteObject(o *ObjectHandle) { if err := o.Delete(context.Background()); err != nil { - h.t.Fatalf("%s: object delete: %v", loc(), err) + h.t.Fatalf("%s: delete object %s from bucket %s: %v", loc(), o.ObjectName(), o.BucketName(), err) } } diff --git a/storage/post_policy_v4.go b/storage/post_policy_v4.go index 5f418c3246b..7e972101015 100644 --- a/storage/post_policy_v4.go +++ b/storage/post_policy_v4.go @@ -116,6 +116,21 @@ type PostPolicyV4Options struct { shouldHashSignBytes bool } +func (opts *PostPolicyV4Options) clone() *PostPolicyV4Options { + return &PostPolicyV4Options{ + GoogleAccessID: opts.GoogleAccessID, + PrivateKey: opts.PrivateKey, + SignBytes: opts.SignBytes, + SignRawBytes: opts.SignRawBytes, + Expires: opts.Expires, + Style: opts.Style, + Insecure: opts.Insecure, + Fields: opts.Fields, + Conditions: opts.Conditions, + shouldHashSignBytes: opts.shouldHashSignBytes, + } +} + // PolicyV4Fields describes the attributes for a PostPolicyV4 request. type PolicyV4Fields struct { // ACL specifies the access control permissions for the object. From 359f5b1ca118ff6f92603da083eb943b672ed779 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 1 Dec 2021 16:27:47 -0800 Subject: [PATCH 6/7] chore(bigquery): release 1.25.0 (#5177) Release-As: 1.25.0 --- bigquery/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/README.md b/bigquery/README.md index ab52b629891..94f5c6d306a 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -1,5 +1,5 @@ ## BigQuery [![Go Reference](https://pkg.go.dev/badge/cloud.google.com/go/bigquery.svg)](https://pkg.go.dev/cloud.google.com/go/bigquery) - + - [About BigQuery](https://cloud.google.com/bigquery/) - [API documentation](https://cloud.google.com/bigquery/docs) - [Go client documentation](https://pkg.go.dev/cloud.google.com/go/bigquery) From b2b54767d45a2b00e005a2bce0d7c8b15ad3605e Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Thu, 2 Dec 2021 20:04:55 +0000 Subject: [PATCH 7/7] feat(storage): add retry config to BucketHandle (#5170) * Adds Retryer configurability to BucketHandle * Adds retrying to BucketHandle.Update, including defaulting to retry only when idempotency conditions are present * Adds retry config to all direct methods on BucketHandle * Adds integration test for retry configs Bucket config will merge with object config, with the object's config overriding the options it sets. --- storage/bucket.go | 46 +++++++++--- storage/bucket_test.go | 88 +++++++++++++++++++++++ storage/storage.go | 32 ++++++++- storage/storage_test.go | 150 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 304 insertions(+), 12 deletions(-) diff --git a/storage/bucket.go b/storage/bucket.go index 93221c5a75c..42888cc49c5 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -44,6 +44,7 @@ type BucketHandle struct { defaultObjectACL ACLHandle conds *BucketConditions userProject string // project for Requester Pays buckets + retry *retryConfig } // Bucket returns a BucketHandle, which provides operations on the named bucket. @@ -95,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck if attrs != nil && attrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) } - return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err }) + return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true) } // Delete deletes the Bucket. @@ -107,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) { if err != nil { return err } - return runWithRetry(ctx, func() error { return req.Context(ctx).Do() }) + + return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true) } func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) { @@ -156,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle { }, gen: -1, userProject: b.userProject, + retry: b.retry.clone(), } } @@ -169,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error return nil, err } var resp *raw.Bucket - err = runWithRetry(ctx, func() error { + err = run(ctx, func() error { resp, err = req.Context(ctx).Do() return err - }) + }, b.retry, true) var e *googleapi.Error if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrBucketNotExist @@ -210,12 +213,20 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) ( if uattrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL) } - // TODO(jba): retry iff metagen is set? - rb, err := req.Context(ctx).Do() - if err != nil { + + isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0 + + var rawBucket *raw.Bucket + call := func() error { + rb, err := req.Context(ctx).Do() + rawBucket = rb + return err + } + + if err := run(ctx, call, b.retry, isIdempotent); err != nil { return nil, err } - return newBucket(rb) + return newBucket(rawBucket) } func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) { @@ -1127,10 +1138,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error { metageneration = b.conds.MetagenerationMatch } req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration) - return runWithRetry(ctx, func() error { + return run(ctx, func() error { _, err := req.Context(ctx).Do() return err - }) + }, b.retry, true) } // applyBucketConds modifies the provided call using the conditions in conds. @@ -1413,6 +1424,21 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator { return it } +// Retryer returns a bucket handle that is configured with custom retry +// behavior as specified by the options that are passed to it. All operations +// on the new handle will use the customized retry configuration. +// Retry options set on a object handle will take precedence over options set on +// the bucket handle. +func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle { + b2 := *b + retry := &retryConfig{} + for _, opt := range opts { + opt.apply(retry) + } + b2.retry = retry + return &b2 +} + // An ObjectIterator is an iterator over ObjectAttrs. // // Note: This iterator is not safe for concurrent operations without explicit synchronization. diff --git a/storage/bucket_test.go b/storage/bucket_test.go index 0e65e8becdf..2b4491766fc 100644 --- a/storage/bucket_test.go +++ b/storage/bucket_test.go @@ -23,6 +23,7 @@ import ( "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" + gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" ) @@ -717,3 +718,90 @@ func TestNewBucket(t *testing.T) { t.Errorf("got=-, want=+:\n%s", diff) } } + +func TestBucketRetryer(t *testing.T) { + testCases := []struct { + name string + call func(b *BucketHandle) *BucketHandle + want *retryConfig + }{ + { + name: "all defaults", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer() + }, + want: &retryConfig{}, + }, + { + name: "set all options", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithBackoff(gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + }), + WithPolicy(RetryAlways), + WithErrorFunc(func(err error) bool { return false })) + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + }, + policy: RetryAlways, + shouldRetry: func(err error) bool { return false }, + }, + }, + { + name: "set some backoff options", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithBackoff(gax.Backoff{ + Multiplier: 3, + })) + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Multiplier: 3, + }}, + }, + { + name: "set policy only", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer(WithPolicy(RetryNever)) + }, + want: &retryConfig{ + policy: RetryNever, + }, + }, + { + name: "set ErrorFunc only", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithErrorFunc(func(err error) bool { return false })) + }, + want: &retryConfig{ + shouldRetry: func(err error) bool { return false }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(s *testing.T) { + b := tc.call(&BucketHandle{}) + if diff := cmp.Diff( + b.retry, + tc.want, + cmp.AllowUnexported(retryConfig{}, gax.Backoff{}), + // ErrorFunc cannot be compared directly, but we check if both are + // either nil or non-nil. + cmp.Comparer(func(a, b func(err error) bool) bool { + return (a == nil && b == nil) || (a != nil && b != nil) + }), + ); diff != "" { + s.Fatalf("retry not configured correctly: %v", diff) + } + }) + } +} diff --git a/storage/storage.go b/storage/storage.go index 798dd4598eb..07ba64cc9e9 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1790,9 +1790,18 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool // Retryer returns an object handle that is configured with custom retry // behavior as specified by the options that are passed to it. All operations // on the new handle will use the customized retry configuration. +// These retry options will merge with the bucket's retryer (if set) for the +// returned handle. Options passed into this method will take precedence over +// options on the bucket's retryer. func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle { o2 := *o - retry := &retryConfig{} + var retry *retryConfig + if o.retry != nil { + // merge the options with the existing retry + retry = o.retry + } else { + retry = &retryConfig{} + } for _, opt := range opts { opt.apply(retry) } @@ -1897,6 +1906,27 @@ type retryConfig struct { shouldRetry func(err error) bool } +func (r *retryConfig) clone() *retryConfig { + if r == nil { + return nil + } + + var bo *gax.Backoff + if r.backoff != nil { + bo = &gax.Backoff{ + Initial: r.backoff.Initial, + Max: r.backoff.Max, + Multiplier: r.backoff.Multiplier, + } + } + + return &retryConfig{ + backoff: bo, + policy: r.policy, + shouldRetry: r.shouldRetry, + } +} + // composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods // that modifyCall searches for by name. type composeSourceObj struct { diff --git a/storage/storage_test.go b/storage/storage_test.go index dafc9c3d22c..b4f7a4ef429 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -788,7 +788,7 @@ func TestConditionErrors(t *testing.T) { // Test that ObjectHandle.Retryer correctly configures the retry configuration // in the ObjectHandle. -func TestRetryer(t *testing.T) { +func TestObjectRetryer(t *testing.T) { testCases := []struct { name string call func(o *ObjectHandle) *ObjectHandle @@ -875,6 +875,154 @@ func TestRetryer(t *testing.T) { } } +// Test the interactions between ObjectHandle and BucketHandle Retryers and that +// they correctly configure the retry configuration +func TestRetryer(t *testing.T) { + testCases := []struct { + name string + bucketOptions []RetryOption + objectOptions []RetryOption + want *retryConfig + }{ + { + name: "no retries", + want: nil, + }, + { + name: "object retryer configures retry", + objectOptions: []RetryOption{ + WithPolicy(RetryAlways), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + shouldRetry: shouldRetry, + policy: RetryAlways, + }, + }, + { + name: "bucket retryer configures retry", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }), + WithPolicy(RetryAlways), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }, + shouldRetry: shouldRetry, + policy: RetryAlways, + }, + }, + { + name: "object retryer overrides bucket retryer", + bucketOptions: []RetryOption{ + WithPolicy(RetryAlways), + }, + objectOptions: []RetryOption{ + WithPolicy(RetryNever), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + policy: RetryNever, + shouldRetry: shouldRetry, + }, + }, + { + name: "object retryer overrides bucket retryer backoff options", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Microsecond, + }), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Microsecond, + }, + }, + }, + { + name: "object retryer does not override bucket retryer if option is not set", + bucketOptions: []RetryOption{ + WithPolicy(RetryNever), + WithErrorFunc(shouldRetry), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Second, + }), + }, + want: &retryConfig{ + policy: RetryNever, + shouldRetry: shouldRetry, + backoff: &gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Second, + }, + }, + }, + { + name: "object's backoff completely overwrites bucket's backoff", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Hour, + }), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Multiplier: 4, + }), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Multiplier: 4, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(s *testing.T) { + b := &BucketHandle{} + if len(tc.bucketOptions) > 0 { + b = b.Retryer(tc.bucketOptions...) + } + o := b.Object("obj") + if len(tc.objectOptions) > 0 { + o = o.Retryer(tc.objectOptions...) + } + + if diff := cmp.Diff( + o.retry, + tc.want, + cmp.AllowUnexported(retryConfig{}, gax.Backoff{}), + // ErrorFunc cannot be compared directly, but we check if both are + // either nil or non-nil. + cmp.Comparer(func(a, b func(err error) bool) bool { + return (a == nil && b == nil) || (a != nil && b != nil) + }), + ); diff != "" { + s.Fatalf("retry not configured correctly: %v", diff) + } + }) + } +} + // Test object compose. func TestObjectCompose(t *testing.T) { t.Parallel()