diff --git a/bigquery/dataset.go b/bigquery/dataset.go index 91b649eed8c..7f9115a5c2f 100644 --- a/bigquery/dataset.go +++ b/bigquery/dataset.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -88,6 +89,25 @@ func (c *Client) DatasetInProject(projectID, datasetID string) *Dataset { } } +// Identifier returns the ID of the dataset in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (d *Dataset) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s", d.ProjectID, d.DatasetID), nil + case StandardSQLID: + // Quote project identifiers if they have a dash character. + if strings.Contains(d.ProjectID, "-") { + return fmt.Sprintf("`%s`.%s", d.ProjectID, d.DatasetID), nil + } + return fmt.Sprintf("%s.%s", d.ProjectID, d.DatasetID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // Create creates a dataset in the BigQuery service. An error will be returned if the // dataset already exists. Pass in a DatasetMetadata value to configure the dataset. func (d *Dataset) Create(ctx context.Context, md *DatasetMetadata) (err error) { diff --git a/bigquery/dataset_test.go b/bigquery/dataset_test.go index 351947b73e9..fb4d3ea2ee2 100644 --- a/bigquery/dataset_test.go +++ b/bigquery/dataset_test.go @@ -476,3 +476,63 @@ func TestConvertAccessEntry(t *testing.T) { t.Error("got nil, want error") } } + +func TestDatasetIdentifiers(t *testing.T) { + testDataset := &Dataset{ + ProjectID: "p", + DatasetID: "d", + c: nil, + } + for _, tc := range []struct { + description string + in *Dataset + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testDataset, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testDataset, + format: LegacySQLID, + want: "p:d", + }, + { + description: "standard unquoted", + in: testDataset, + format: StandardSQLID, + want: "p.d", + }, + { + description: "standard w/quoting", + in: &Dataset{ProjectID: "p-p", DatasetID: "d"}, + format: StandardSQLID, + want: "`p-p`.d", + }, + { + description: "api resource", + in: testDataset, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/go.mod b/bigquery/go.mod index d54a0d1aaf4..a8c9246ce49 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -12,8 +12,8 @@ require ( go.opencensus.io v0.23.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 - google.golang.org/api v0.60.0 - google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 + google.golang.org/api v0.61.0 + google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 ) diff --git a/bigquery/go.sum b/bigquery/go.sum index dfcf3b31b33..7ea9f942a43 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -289,8 +289,8 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -348,8 +348,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -448,8 +448,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= -google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk= -google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4= +google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= +google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -515,9 +515,9 @@ google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211021150943-2b146023228c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 h1:b9mVrqYfq3P4bCdaLg1qtBnPzUYgglsIdjZkL/fQVOE= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 h1:DN5b3HU13J4sMd/QjDx34U6afpaexKTDdop+26pdjdk= +google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 766c56639ae..bc8b96a137d 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -371,12 +371,12 @@ func TestIntegration_TableCreateView(t *testing.T) { } ctx := context.Background() table := newTable(t, schema) + tableIdentifier, _ := table.Identifier(StandardSQLID) defer table.Delete(ctx) // Test that standard SQL views work. view := dataset.Table("t_view_standardsql") - query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", - dataset.ProjectID, dataset.DatasetID, table.TableID) + query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM %s", tableIdentifier) err := view.Create(context.Background(), &TableMetadata{ ViewQuery: query, UseStandardSQL: true, @@ -936,10 +936,11 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) { // Create a sample UDF so we can verify adding authorized UDFs routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, - routine.FullyQualifiedName()) + CREATE FUNCTION %s(x INT64) AS (x * 3);`, + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -1348,13 +1349,14 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { // Define a simple stored procedure via DDL. routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE OR REPLACE PROCEDURE `+"`%s`"+`(val INT64) + CREATE OR REPLACE PROCEDURE %s(val INT64) BEGIN SELECT CURRENT_TIMESTAMP() as ts; SELECT val * 2 as f2; END`, - routine.FullyQualifiedName()) + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) @@ -1363,8 +1365,8 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { // Invoke the stored procedure. sql = fmt.Sprintf(` - CALL `+"`%s`"+`(5)`, - routine.FullyQualifiedName()) + CALL %s(5)`, + routineSQLID) q := client.Query(sql) it, err := q.Read(ctx) @@ -2354,8 +2356,10 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { } defer customTable.Delete(ctx) + customTableSQLID, _ := customTable.Identifier(StandardSQLID) + // Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected. - sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID) + sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM %s WHERE pkey=\"foo\"", customTableSQLID) q := client.Query(sql) it, err := q.Read(ctx) if err != nil { @@ -3227,10 +3231,10 @@ func TestIntegration_ModelLifecycle(t *testing.T) { // Create a model via a CREATE MODEL query modelID := modelIDs.New() model := dataset.Model(modelID) - modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID) + modelSQLID, _ := model.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE MODEL `+"`%s`"+` + CREATE MODEL %s OPTIONS ( model_type='linear_reg', max_iteration=1, @@ -3240,7 +3244,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) { SELECT 'a' AS f1, 2.0 AS label UNION ALL SELECT 'b' AS f1, 3.8 AS label - )`, modelRef) + )`, modelSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -3417,13 +3421,14 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) { routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`("+` + CREATE FUNCTION %s( arr ARRAY> ) AS ( (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) )`, - routine.FullyQualifiedName()) + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } @@ -3480,10 +3485,11 @@ func TestIntegration_RoutineLifecycle(t *testing.T) { // Create a scalar UDF routine via a CREATE FUNCTION query routineID := routineIDs.New() routine := dataset.Routine(routineID) + routineSQLID, _ := routine.Identifier(StandardSQLID) sql := fmt.Sprintf(` - CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, - routine.FullyQualifiedName()) + CREATE FUNCTION %s(x INT64) AS (x * 3);`, + routineSQLID) if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } diff --git a/bigquery/model.go b/bigquery/model.go index 363133bf66d..56260efe9c3 100644 --- a/bigquery/model.go +++ b/bigquery/model.go @@ -17,6 +17,7 @@ package bigquery import ( "context" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -41,9 +42,31 @@ type Model struct { c *Client } +// Identifier returns the ID of the model in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (m *Model) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s.%s", m.ProjectID, m.DatasetID, m.ModelID), nil + case StandardSQLID: + // Per https://cloud.google.com/bigquery-ml/docs/reference/standard-sql/bigqueryml-syntax-create#model_name + // we quote the entire identifier. + out := fmt.Sprintf("%s.%s.%s", m.ProjectID, m.DatasetID, m.ModelID) + if strings.Contains(out, "-") { + out = fmt.Sprintf("`%s`", out) + } + return out, nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns the ID of the model in projectID:datasetID.modelid format. func (m *Model) FullyQualifiedName() string { - return fmt.Sprintf("%s:%s.%s", m.ProjectID, m.DatasetID, m.ModelID) + s, _ := m.Identifier(LegacySQLID) + return s } func (m *Model) toBQ() *bq.ModelReference { diff --git a/bigquery/model_test.go b/bigquery/model_test.go index 98b38c864e5..d46bd296312 100644 --- a/bigquery/model_test.go +++ b/bigquery/model_test.go @@ -120,3 +120,64 @@ func TestModelMetadataUpdateToBQ(t *testing.T) { } } } + +func TestModelIdentifiers(t *testing.T) { + testModel := &Model{ + ProjectID: "p", + DatasetID: "d", + ModelID: "m", + c: nil, + } + for _, tc := range []struct { + description string + in *Model + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testModel, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testModel, + format: LegacySQLID, + want: "p:d.m", + }, + { + description: "standard unquoted", + in: testModel, + format: StandardSQLID, + want: "p.d.m", + }, + { + description: "standard w/dash", + in: &Model{ProjectID: "p-p", DatasetID: "d", ModelID: "m"}, + format: StandardSQLID, + want: "`p-p.d.m`", + }, + { + description: "api resource", + in: testModel, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/routine.go b/bigquery/routine.go index 46c8ca398f9..20011cf3a17 100644 --- a/bigquery/routine.go +++ b/bigquery/routine.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "cloud.google.com/go/internal/optional" @@ -44,9 +45,26 @@ func (r *Routine) toBQ() *bq.RoutineReference { } } +// Identifier returns the ID of the routine in the requested format. +// +// For Standard SQL format, the identifier will be quoted if the +// ProjectID contains dash (-) characters. +func (r *Routine) Identifier(f IdentifierFormat) (string, error) { + switch f { + case StandardSQLID: + if strings.Contains(r.ProjectID, "-") { + return fmt.Sprintf("`%s`.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil + } + return fmt.Sprintf("%s.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns an identifer for the routine in project.dataset.routine format. func (r *Routine) FullyQualifiedName() string { - return fmt.Sprintf("%s.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID) + s, _ := r.Identifier(StandardSQLID) + return s } // Create creates a Routine in the BigQuery service. diff --git a/bigquery/routine_test.go b/bigquery/routine_test.go index 697713c86da..59210ff9a01 100644 --- a/bigquery/routine_test.go +++ b/bigquery/routine_test.go @@ -167,3 +167,63 @@ func TestRoutineTypeConversions(t *testing.T) { }) } } + +func TestRoutineIdentifiers(t *testing.T) { + testRoutine := &Routine{ + ProjectID: "p", + DatasetID: "d", + RoutineID: "r", + c: nil, + } + for _, tc := range []struct { + description string + in *Routine + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testRoutine, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testRoutine, + wantErr: true, + }, + { + description: "standard unquoted", + in: testRoutine, + format: StandardSQLID, + want: "p.d.r", + }, + { + description: "standard w/dash", + in: &Routine{ProjectID: "p-p", DatasetID: "d", RoutineID: "r"}, + format: StandardSQLID, + want: "`p-p`.d.r", + }, + { + description: "api resource", + in: testRoutine, + format: StorageAPIResourceID, + wantErr: true, + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 020d0de741a..8a9d3323976 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -19,6 +19,7 @@ import ( storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { // append request. type pendingWrite struct { request *storagepb.AppendRowsRequest - result *AppendResult + // for schema evolution cases, accept a new schema + newSchema *descriptorpb.DescriptorProto + result *AppendResult // this is used by the flow controller. reqSize int @@ -77,7 +80,7 @@ type pendingWrite struct { // that in the future, we may want to allow row batching to be managed by // the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING // streams, this should be managed by the user. -func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { +func newPendingWrite(appends [][]byte) *pendingWrite { pw := &pendingWrite{ request: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -90,9 +93,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { }, result: newAppendResult(appends), } - if offset > 0 { - pw.request.Offset = &wrapperspb.Int64Value{Value: offset} - } // We compute the size now for flow controller purposes, though // the actual request size may be slightly larger (e.g. the first // request in a new stream bears schema and stream id). @@ -114,3 +114,23 @@ func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowControlle fc.release(pw.reqSize) } } + +// AppendOption are options that can be passed when appending data with a managed stream instance. +type AppendOption func(*pendingWrite) + +// UpdateSchemaDescriptor is used to update the descriptor message schema associated +// with a given stream. +func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption { + return func(pw *pendingWrite) { + pw.newSchema = schema + } +} + +// WithOffset sets an explicit offset value for this append request. +func WithOffset(offset int64) AppendOption { + return func(pw *pendingWrite) { + pw.request.Offset = &wrapperspb.Int64Value{ + Value: offset, + } + } +} diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index e30a53fb6d2..d760f5b8e97 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -43,38 +43,33 @@ func TestPendingWrite(t *testing.T) { []byte("row3"), } - var wantOffset int64 = 99 - - // first, verify no offset behavior - pending := newPendingWrite(wantRowData, NoStreamOffset) + // verify no offset behavior + pending := newPendingWrite(wantRowData) if pending.request.GetOffset() != nil { t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) } - pending.markDone(NoStreamOffset, nil, nil) - if pending.result.offset != NoStreamOffset { - t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset) - } - if pending.result.err != nil { - t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) - } - - // now, verify behavior with a valid offset - pending = newPendingWrite(wantRowData, 99) - if pending.request.GetOffset() == nil { - t.Errorf("offset not set, should be %d", wantOffset) - } - if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset { - t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset) - } - // check request shape gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows()) if gotRowCount != len(wantRowData) { t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData)) } - // verify AppendResult + // Verify request is not acknowledged. + select { + case <-pending.result.Ready(): + t.Errorf("got Ready() on incomplete AppendResult") + case <-time.After(100 * time.Millisecond): + + } + // Mark completed, verify result. + pending.markDone(NoStreamOffset, nil, nil) + if pending.result.offset != NoStreamOffset { + t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset) + } + if pending.result.err != nil { + t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) + } gotData := pending.result.rowData if len(gotData) != len(wantRowData) { t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData)) @@ -84,15 +79,24 @@ func TestPendingWrite(t *testing.T) { t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i]) } } - select { - case <-pending.result.Ready(): - t.Errorf("got Ready() on incomplete AppendResult") - case <-time.After(100 * time.Millisecond): - } + // Create new write to verify error result. + pending = newPendingWrite(wantRowData) - // verify completion behavior + // Manually invoke option to apply offset to request. + // This would normally be appied as part of the AppendRows() method on the managed stream. reportedOffset := int64(101) + f := WithOffset(reportedOffset) + f(pending) + + if pending.request.GetOffset() == nil { + t.Errorf("expected offset, got none") + } + if pending.request.GetOffset().GetValue() != reportedOffset { + t.Errorf("offset mismatch, got %d wanted %d", pending.request.GetOffset().GetValue(), reportedOffset) + } + + // Verify completion behavior with an error. wantErr := fmt.Errorf("foo") pending.markDone(reportedOffset, wantErr, nil) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index d0ebca0f5de..7573c23a4fc 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -144,6 +144,10 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Parallel() testPendingStream(ctx, t, mwClient, bqClient, dataset) }) + t.Run("SchemaEvolution", func(t *testing.T) { + t.Parallel() + testSchemaEvolution(ctx, t, mwClient, bqClient, dataset) + }) t.Run("Instrumentation", func(t *testing.T) { // Don't run this in parallel, we only want to collect stats from this subtest. testInstrumentation(ctx, t, mwClient, bqClient, dataset) @@ -181,7 +185,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -201,7 +205,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } data = append(data, b) } - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("grouped-row append failed: %v", err) } @@ -256,7 +260,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C if err != nil { t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) } - result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -305,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err := ms.AppendRows(ctx, data, NoStreamOffset) + results, err := ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -358,7 +362,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -397,12 +401,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } + // be explicit about waiting/checking each response. + off, err := result.GetResult(ctx) + if err != nil { + t.Errorf("response %d error: %v", k, err) + } + if off != int64(k) { + t.Errorf("offset mismatch, got %d want %d", off, k) + } } - result.Ready() wantRows := int64(len(testSimpleData)) // Mark stream complete. @@ -468,7 +479,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - result, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } @@ -513,6 +524,85 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq } } +func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + var result *AppendResult + for k, mesg := range testSimpleData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + result, err = ms.AppendRows(ctx, data) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready, then validate. + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after send", + withExactRowCount(int64(len(testSimpleData)))) + + // Now, evolve the underlying table schema. + _, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "") + if err != nil { + t.Errorf("failed to evolve table schema: %v", err) + } + + // TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change. + // In the continuous case, we'd get it from the response, but the change-and-wait case needs something more. + time.Sleep(6 * time.Second) + + // ready descriptor, send an additional append + m2 := &testdata.SimpleMessageEvolvedProto2{ + Name: proto.String("evolved"), + Value: proto.Int64(180), + Other: proto.String("hello evolution"), + } + descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor()) + b, err := proto.Marshal(m2) + if err != nil { + t.Errorf("failed to marshal evolved message: %v", err) + } + result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto)) + if err != nil { + t.Errorf("failed evolved append: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on evolved append: %v", err) + } + + validateTableConstraints(ctx, t, bqClient, testTable, "after send", + withExactRowCount(int64(len(testSimpleData)+1)), + withNullCount("name", 0), + withNonNullCount("other", 1), + ) +} + func TestIntegration_DetectProjectID(t *testing.T) { ctx := context.Background() testCreds := testutil.Credentials(ctx) @@ -613,7 +703,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, if err != nil { t.Fatalf("NewManagedStream: %v", err) } - result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset) + result, err := ms.AppendRows(ctx, [][]byte{sampleRow}) if err != nil { t.Errorf("append failed: %v", err) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 2a419652e6c..7e26d53d2f8 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -178,7 +178,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( // getStream returns either a valid ARC client stream or permanent error. // // Calling getStream locks the mutex. -func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { +func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.err != nil { @@ -190,9 +190,16 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient } // Always return the retained ARC if the arg differs. - if arc != ms.arc { + if arc != ms.arc && !forceReconnect { return ms.arc, ms.pending, nil } + if arc != ms.arc && forceReconnect && ms.arc != nil { + // In this case, we're forcing a close to apply changes to the stream + // that currently can't be modified on an established connection. + // + // TODO: clean this up once internal issue 205756033 is resolved. + (*ms.arc).CloseSend() + } ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) *ms.arc, ms.pending, ms.err = ms.openWithRetry() @@ -248,7 +255,7 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error var err error for { - arc, ch, err = ms.getStream(arc) + arc, ch, err = ms.getStream(arc, pw.newSchema != nil) if err != nil { return err } @@ -304,7 +311,7 @@ func (ms *ManagedStream) Close() error { var arc *storagepb.BigQueryWrite_AppendRowsClient - arc, ch, err := ms.getStream(arc) + arc, ch, err := ms.getStream(arc, false) if err != nil { return err } @@ -328,15 +335,30 @@ func (ms *ManagedStream) Close() error { // AppendRows sends the append requests to the service, and returns a single AppendResult for tracking // the set of data. // -// The format of the row data is binary serialized protocol buffer bytes, and and the message -// must adhere to the format of the schema Descriptor passed in when creating the managed stream. -func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) (*AppendResult, error) { - pw := newPendingWrite(data, offset) +// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible +// with the schema currently set for the stream. +// +// Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for +// a default stream is unsupported. +func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) { + pw := newPendingWrite(data) + // apply AppendOption opts + for _, opt := range opts { + opt(pw) + } // check flow control if err := ms.fc.acquire(ctx, pw.reqSize); err != nil { // in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release. pw.markDone(NoStreamOffset, err, nil) } + // if we've received an updated schema as part of a write, propagate it to both the cached schema and + // populate the schema in the request. + if pw.newSchema != nil { + ms.schemaDescriptor = pw.newSchema + pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + ProtoDescriptor: pw.newSchema, + } + } // proceed to call if err := ms.append(pw); err != nil { // pending write is DOA. diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 94d087c44ba..06a385e402c 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -127,7 +127,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { wantReqs := 3 for i := 0; i < wantReqs; i++ { - _, err := ms.AppendRows(ctx, fakeData, NoStreamOffset) + _, err := ms.AppendRows(ctx, fakeData, WithOffset(int64(i))) if err != nil { t.Errorf("AppendRows; %v", err) } @@ -145,6 +145,14 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { if v == nil { t.Errorf("request %d was nil", k) } + if v.GetOffset() == nil { + t.Errorf("request %d had no offset", k) + } else { + gotOffset := v.GetOffset().GetValue() + if gotOffset != int64(k) { + t.Errorf("request %d wanted offset %d, got %d", k, k, gotOffset) + } + } if k == 0 { if v.GetTraceId() == "" { t.Errorf("expected TraceId on first request, was empty") diff --git a/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go b/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go index d57ac20be31..b287799d885 100644 --- a/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go +++ b/bigquery/storage/managedwriter/testdata/messages_proto2.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0 -// protoc v3.10.1 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: messages_proto2.proto package testdata @@ -24,7 +24,6 @@ import ( reflect "reflect" sync "sync" - proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) @@ -36,10 +35,6 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// This is a compile-time assertion that a sufficiently up-to-date version -// of the legacy proto package is being used. -const _ = proto.ProtoPackageIsVersion4 - // SimpleMessage represents a simple message that transmits a string and int64 value. type SimpleMessageProto2 struct { state protoimpl.MessageState @@ -98,6 +93,72 @@ func (x *SimpleMessageProto2) GetValue() int64 { return 0 } +type SimpleMessageEvolvedProto2 struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // name is a simple scalar string. + Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + // value is a simple int64 value. + Value *int64 `protobuf:"varint,2,opt,name=value" json:"value,omitempty"` + // other is an additional string. + Other *string `protobuf:"bytes,3,opt,name=other" json:"other,omitempty"` +} + +func (x *SimpleMessageEvolvedProto2) Reset() { + *x = SimpleMessageEvolvedProto2{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto2_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SimpleMessageEvolvedProto2) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SimpleMessageEvolvedProto2) ProtoMessage() {} + +func (x *SimpleMessageEvolvedProto2) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto2_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SimpleMessageEvolvedProto2.ProtoReflect.Descriptor instead. +func (*SimpleMessageEvolvedProto2) Descriptor() ([]byte, []int) { + return file_messages_proto2_proto_rawDescGZIP(), []int{1} +} + +func (x *SimpleMessageEvolvedProto2) GetName() string { + if x != nil && x.Name != nil { + return *x.Name + } + return "" +} + +func (x *SimpleMessageEvolvedProto2) GetValue() int64 { + if x != nil && x.Value != nil { + return *x.Value + } + return 0 +} + +func (x *SimpleMessageEvolvedProto2) GetOther() string { + if x != nil && x.Other != nil { + return *x.Other + } + return "" +} + type GithubArchiveEntityProto2 struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -113,7 +174,7 @@ type GithubArchiveEntityProto2 struct { func (x *GithubArchiveEntityProto2) Reset() { *x = GithubArchiveEntityProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[1] + mi := &file_messages_proto2_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -126,7 +187,7 @@ func (x *GithubArchiveEntityProto2) String() string { func (*GithubArchiveEntityProto2) ProtoMessage() {} func (x *GithubArchiveEntityProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[1] + mi := &file_messages_proto2_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -139,7 +200,7 @@ func (x *GithubArchiveEntityProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveEntityProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveEntityProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{1} + return file_messages_proto2_proto_rawDescGZIP(), []int{2} } func (x *GithubArchiveEntityProto2) GetId() int64 { @@ -190,7 +251,7 @@ type GithubArchiveRepoProto2 struct { func (x *GithubArchiveRepoProto2) Reset() { *x = GithubArchiveRepoProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[2] + mi := &file_messages_proto2_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -203,7 +264,7 @@ func (x *GithubArchiveRepoProto2) String() string { func (*GithubArchiveRepoProto2) ProtoMessage() {} func (x *GithubArchiveRepoProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[2] + mi := &file_messages_proto2_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -216,7 +277,7 @@ func (x *GithubArchiveRepoProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveRepoProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveRepoProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{2} + return file_messages_proto2_proto_rawDescGZIP(), []int{3} } func (x *GithubArchiveRepoProto2) GetId() int64 { @@ -260,7 +321,7 @@ type GithubArchiveMessageProto2 struct { func (x *GithubArchiveMessageProto2) Reset() { *x = GithubArchiveMessageProto2{} if protoimpl.UnsafeEnabled { - mi := &file_messages_proto2_proto_msgTypes[3] + mi := &file_messages_proto2_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -273,7 +334,7 @@ func (x *GithubArchiveMessageProto2) String() string { func (*GithubArchiveMessageProto2) ProtoMessage() {} func (x *GithubArchiveMessageProto2) ProtoReflect() protoreflect.Message { - mi := &file_messages_proto2_proto_msgTypes[3] + mi := &file_messages_proto2_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -286,7 +347,7 @@ func (x *GithubArchiveMessageProto2) ProtoReflect() protoreflect.Message { // Deprecated: Use GithubArchiveMessageProto2.ProtoReflect.Descriptor instead. func (*GithubArchiveMessageProto2) Descriptor() ([]byte, []int) { - return file_messages_proto2_proto_rawDescGZIP(), []int{3} + return file_messages_proto2_proto_rawDescGZIP(), []int{4} } func (x *GithubArchiveMessageProto2) GetType() string { @@ -361,46 +422,52 @@ var file_messages_proto2_proto_rawDesc = []byte{ 0x67, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x22, 0x93, 0x01, 0x0a, 0x19, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, + 0x75, 0x65, 0x22, 0x5c, 0x0a, 0x1a, 0x53, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x45, 0x76, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, + 0x68, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, + 0x22, 0x93, 0x01, 0x0a, 0x19, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, + 0x0a, 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6c, + 0x6f, 0x67, 0x69, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x72, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x72, 0x61, 0x76, 0x61, + 0x74, 0x61, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x5f, + 0x75, 0x72, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x76, 0x61, 0x74, 0x61, + 0x72, 0x55, 0x72, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x4f, 0x0a, 0x17, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x32, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0xd0, 0x02, 0x0a, 0x1a, 0x47, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x75, + 0x62, 0x6c, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x70, 0x75, 0x62, 0x6c, + 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, 0x0a, 0x04, + 0x72, 0x65, 0x70, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x73, + 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, + 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x04, 0x72, + 0x65, 0x70, 0x6f, 0x12, 0x39, 0x0a, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x35, + 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, + 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x72, 0x61, 0x76, 0x61, 0x74, - 0x61, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x72, 0x61, - 0x76, 0x61, 0x74, 0x61, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x76, 0x61, 0x74, 0x61, - 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x76, 0x61, - 0x74, 0x61, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x4f, 0x0a, 0x17, 0x47, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x32, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0xd0, 0x02, 0x0a, 0x1a, 0x47, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, - 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x70, 0x75, - 0x62, 0x6c, 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, - 0x0a, 0x04, 0x72, 0x65, 0x70, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, - 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, - 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, - 0x04, 0x72, 0x65, 0x70, 0x6f, 0x12, 0x39, 0x0a, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, - 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x52, 0x05, 0x61, 0x63, 0x74, 0x6f, 0x72, - 0x12, 0x35, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, - 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x47, 0x69, 0x74, 0x68, 0x75, 0x62, 0x41, - 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x32, 0x52, 0x03, 0x6f, 0x72, 0x67, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x18, - 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x42, 0x3d, 0x5a, 0x3b, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, - 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, + 0x52, 0x03, 0x6f, 0x72, 0x67, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x64, 0x41, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x18, 0x09, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x6f, 0x2f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, + 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, } var ( @@ -415,17 +482,18 @@ func file_messages_proto2_proto_rawDescGZIP() []byte { return file_messages_proto2_proto_rawDescData } -var file_messages_proto2_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_messages_proto2_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_messages_proto2_proto_goTypes = []interface{}{ (*SimpleMessageProto2)(nil), // 0: testdata.SimpleMessageProto2 - (*GithubArchiveEntityProto2)(nil), // 1: testdata.GithubArchiveEntityProto2 - (*GithubArchiveRepoProto2)(nil), // 2: testdata.GithubArchiveRepoProto2 - (*GithubArchiveMessageProto2)(nil), // 3: testdata.GithubArchiveMessageProto2 + (*SimpleMessageEvolvedProto2)(nil), // 1: testdata.SimpleMessageEvolvedProto2 + (*GithubArchiveEntityProto2)(nil), // 2: testdata.GithubArchiveEntityProto2 + (*GithubArchiveRepoProto2)(nil), // 3: testdata.GithubArchiveRepoProto2 + (*GithubArchiveMessageProto2)(nil), // 4: testdata.GithubArchiveMessageProto2 } var file_messages_proto2_proto_depIdxs = []int32{ - 2, // 0: testdata.GithubArchiveMessageProto2.repo:type_name -> testdata.GithubArchiveRepoProto2 - 1, // 1: testdata.GithubArchiveMessageProto2.actor:type_name -> testdata.GithubArchiveEntityProto2 - 1, // 2: testdata.GithubArchiveMessageProto2.org:type_name -> testdata.GithubArchiveEntityProto2 + 3, // 0: testdata.GithubArchiveMessageProto2.repo:type_name -> testdata.GithubArchiveRepoProto2 + 2, // 1: testdata.GithubArchiveMessageProto2.actor:type_name -> testdata.GithubArchiveEntityProto2 + 2, // 2: testdata.GithubArchiveMessageProto2.org:type_name -> testdata.GithubArchiveEntityProto2 3, // [3:3] is the sub-list for method output_type 3, // [3:3] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name @@ -452,7 +520,7 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GithubArchiveEntityProto2); i { + switch v := v.(*SimpleMessageEvolvedProto2); i { case 0: return &v.state case 1: @@ -464,7 +532,7 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GithubArchiveRepoProto2); i { + switch v := v.(*GithubArchiveEntityProto2); i { case 0: return &v.state case 1: @@ -476,6 +544,18 @@ func file_messages_proto2_proto_init() { } } file_messages_proto2_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GithubArchiveRepoProto2); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto2_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GithubArchiveMessageProto2); i { case 0: return &v.state @@ -494,7 +574,7 @@ func file_messages_proto2_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_messages_proto2_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/bigquery/storage/managedwriter/testdata/messages_proto2.proto b/bigquery/storage/managedwriter/testdata/messages_proto2.proto index fdb2fb55e47..3881fdf518e 100644 --- a/bigquery/storage/managedwriter/testdata/messages_proto2.proto +++ b/bigquery/storage/managedwriter/testdata/messages_proto2.proto @@ -21,11 +21,21 @@ option go_package = "cloud.google.com/go/bigquery/storage/managedwriter/testdata message SimpleMessageProto2 { // name is a simple scalar string. optional string name = 1; + // value is a simple int64 value. optional int64 value = 2; } +message SimpleMessageEvolvedProto2 { + // name is a simple scalar string. + optional string name = 1; + // value is a simple int64 value. + optional int64 value = 2; + + // other is an additional string. + optional string other = 3; +} message GithubArchiveEntityProto2 { optional int64 id = 1; diff --git a/bigquery/storage/managedwriter/testdata/schemas.go b/bigquery/storage/managedwriter/testdata/schemas.go index 74f230414ec..5ddda986a6b 100644 --- a/bigquery/storage/managedwriter/testdata/schemas.go +++ b/bigquery/storage/managedwriter/testdata/schemas.go @@ -22,6 +22,12 @@ var ( {Name: "value", Type: bigquery.IntegerFieldType}, } + SimpleMessageEvolvedSchema bigquery.Schema = bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType}, + {Name: "other", Type: bigquery.StringFieldType}, + } + GithubArchiveSchema bigquery.Schema = bigquery.Schema{ {Name: "type", Type: bigquery.StringFieldType}, {Name: "public", Type: bigquery.BooleanFieldType}, diff --git a/bigquery/storage/managedwriter/testutils_test.go b/bigquery/storage/managedwriter/testutils_test.go index 439d958d05e..e6131b73055 100644 --- a/bigquery/storage/managedwriter/testutils_test.go +++ b/bigquery/storage/managedwriter/testutils_test.go @@ -126,23 +126,23 @@ func withExactRowCount(totalRows int64) constraintOption { } // withNullCount asserts the number of null values in a column. -func withNullCount(colname string, nullcount int64) constraintOption { +func withNullCount(colname string, nullCount int64) constraintOption { return func(vi *validationInfo) { resultCol := fmt.Sprintf("nullcol_count_%s", colname) vi.constraints[resultCol] = &constraint{ - projection: fmt.Sprintf("COUNTIF(ISNULL(%s)) AS %s", colname, resultCol), - expectedValue: nullcount, + projection: fmt.Sprintf("SUM(IF(%s IS NULL,1,0)) AS %s", colname, resultCol), + expectedValue: nullCount, } } } // withNonNullCount asserts the number of non null values in a column. -func withNonNullCount(colname string, nullcount int64) constraintOption { +func withNonNullCount(colname string, nonNullCount int64) constraintOption { return func(vi *validationInfo) { resultCol := fmt.Sprintf("nonnullcol_count_%s", colname) vi.constraints[resultCol] = &constraint{ - projection: fmt.Sprintf("COUNTIF(NOT ISNULL(%s)) AS %s", colname, resultCol), - expectedValue: nullcount, + projection: fmt.Sprintf("SUM(IF(%s IS NOT NULL,1,0)) AS %s", colname, resultCol), + expectedValue: nonNullCount, } } } diff --git a/bigquery/table.go b/bigquery/table.go index 202b2f737f2..764e6b27a7c 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -471,9 +471,46 @@ func (t *Table) toBQ() *bq.TableReference { } } +// IdentifierFormat represents a how certain resource identifiers such as table references +// are formatted. +type IdentifierFormat string + +var ( + // StandardSQLID returns an identifier suitable for use with Standard SQL. + StandardSQLID IdentifierFormat = "SQL" + + // LegacySQLID returns an identifier suitable for use with Legacy SQL. + LegacySQLID IdentifierFormat = "LEGACY_SQL" + + // StorageAPIResourceID returns an identifier suitable for use with the Storage API. Namely, it's for formatting + // a table resource for invoking read and write functionality. + StorageAPIResourceID IdentifierFormat = "STORAGE_API_RESOURCE" + + // ErrUnknownIdentifierFormat is indicative of requesting an identifier in a format that is + // not supported. + ErrUnknownIdentifierFormat = errors.New("unknown identifier format") +) + +// Identifier returns the ID of the table in the requested format. +func (t *Table) Identifier(f IdentifierFormat) (string, error) { + switch f { + case LegacySQLID: + return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID), nil + case StorageAPIResourceID: + return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", t.ProjectID, t.DatasetID, t.TableID), nil + case StandardSQLID: + // Note we don't need to quote the project ID here, as StandardSQL has special rules to allow + // dash identifiers for projects without issue in table identifiers. + return fmt.Sprintf("%s.%s.%s", t.ProjectID, t.DatasetID, t.TableID), nil + default: + return "", ErrUnknownIdentifierFormat + } +} + // FullyQualifiedName returns the ID of the table in projectID:datasetID.tableID format. func (t *Table) FullyQualifiedName() string { - return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID) + s, _ := t.Identifier(LegacySQLID) + return s } // implicitTable reports whether Table is an empty placeholder, which signifies that a new table should be created with an auto-generated Table ID. diff --git a/bigquery/table_test.go b/bigquery/table_test.go index bc0d47e66ad..d56c71c2bc1 100644 --- a/bigquery/table_test.go +++ b/bigquery/table_test.go @@ -441,3 +441,64 @@ func TestTableMetadataToUpdateToBQErrors(t *testing.T) { } } } + +func TestTableIdentifiers(t *testing.T) { + testTable := &Table{ + ProjectID: "p", + DatasetID: "d", + TableID: "t", + c: nil, + } + for _, tc := range []struct { + description string + in *Table + format IdentifierFormat + want string + wantErr bool + }{ + { + description: "empty format string", + in: testTable, + format: "", + wantErr: true, + }, + { + description: "legacy", + in: testTable, + format: LegacySQLID, + want: "p:d.t", + }, + { + description: "standard unquoted", + in: testTable, + format: StandardSQLID, + want: "p.d.t", + }, + { + description: "standard w/dash", + in: &Table{ProjectID: "p-p", DatasetID: "d", TableID: "t"}, + format: StandardSQLID, + want: "p-p.d.t", + }, + { + description: "api resource", + in: testTable, + format: StorageAPIResourceID, + want: "projects/p/datasets/d/tables/t", + }, + } { + got, err := tc.in.Identifier(tc.format) + if tc.wantErr && err == nil { + t.Errorf("case %q: wanted err, was success", tc.description) + } + if !tc.wantErr { + if err != nil { + t.Errorf("case %q: wanted success, got err: %v", tc.description, err) + } else { + if got != tc.want { + t.Errorf("case %q: got %s, want %s", tc.description, got, tc.want) + } + } + } + } +} diff --git a/go.mod b/go.mod index 447c93ea400..1921460b67b 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,9 @@ require ( github.com/google/martian/v3 v3.2.1 github.com/googleapis/gax-go/v2 v2.1.1 go.opencensus.io v0.23.0 - golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 - google.golang.org/api v0.60.0 + google.golang.org/api v0.61.0 google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 diff --git a/go.sum b/go.sum index b12260c491b..13488dfe825 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,8 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -344,8 +344,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -444,8 +444,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= -google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk= -google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4= +google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= +google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -510,7 +510,7 @@ google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211021150943-2b146023228c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 h1:DN5b3HU13J4sMd/QjDx34U6afpaexKTDdop+26pdjdk= google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/internal/generated/snippets/go.mod b/internal/generated/snippets/go.mod index 44ccaea2edb..c4abe946bd1 100644 --- a/internal/generated/snippets/go.mod +++ b/internal/generated/snippets/go.mod @@ -126,7 +126,7 @@ require ( cloud.google.com/go/webrisk v0.1.0 cloud.google.com/go/websecurityscanner v0.1.0 cloud.google.com/go/workflows v0.1.0 - google.golang.org/api v0.60.0 + google.golang.org/api v0.61.0 google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 ) diff --git a/internal/generated/snippets/go.sum b/internal/generated/snippets/go.sum index 9ae7e91ce12..d7b3bf85db9 100644 --- a/internal/generated/snippets/go.sum +++ b/internal/generated/snippets/go.sum @@ -107,8 +107,9 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -125,8 +126,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -151,8 +153,9 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= -google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk= google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4= +google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= +google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= diff --git a/internal/godocfx/go.sum b/internal/godocfx/go.sum index c3d60922393..00d88dc8a58 100644 --- a/internal/godocfx/go.sum +++ b/internal/godocfx/go.sum @@ -190,8 +190,8 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -228,8 +228,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -288,8 +288,8 @@ google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= -google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk= -google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4= +google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= +google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -325,7 +325,7 @@ google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKr google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= -google.golang.org/genproto v0.0.0-20211021150943-2b146023228c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 h1:DN5b3HU13J4sMd/QjDx34U6afpaexKTDdop+26pdjdk= google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/storage/bucket.go b/storage/bucket.go index ec7dcb5c322..42888cc49c5 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -44,6 +44,7 @@ type BucketHandle struct { defaultObjectACL ACLHandle conds *BucketConditions userProject string // project for Requester Pays buckets + retry *retryConfig } // Bucket returns a BucketHandle, which provides operations on the named bucket. @@ -95,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck if attrs != nil && attrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) } - return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err }) + return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true) } // Delete deletes the Bucket. @@ -107,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) { if err != nil { return err } - return runWithRetry(ctx, func() error { return req.Context(ctx).Do() }) + + return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true) } func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) { @@ -156,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle { }, gen: -1, userProject: b.userProject, + retry: b.retry.clone(), } } @@ -169,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error return nil, err } var resp *raw.Bucket - err = runWithRetry(ctx, func() error { + err = run(ctx, func() error { resp, err = req.Context(ctx).Do() return err - }) + }, b.retry, true) var e *googleapi.Error if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrBucketNotExist @@ -210,12 +213,20 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) ( if uattrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL) } - // TODO(jba): retry iff metagen is set? - rb, err := req.Context(ctx).Do() - if err != nil { + + isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0 + + var rawBucket *raw.Bucket + call := func() error { + rb, err := req.Context(ctx).Do() + rawBucket = rb + return err + } + + if err := run(ctx, call, b.retry, isIdempotent); err != nil { return nil, err } - return newBucket(rb) + return newBucket(rawBucket) } func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) { @@ -282,8 +293,54 @@ func (b *BucketHandle) SignedURL(object string, opts *SignedURLOptions) (string, return SignedURL(b.name, object, newopts) } -// TODO: Add a similar wrapper for GenerateSignedPostPolicyV4 allowing users to -// omit PrivateKey/SignBytes +// GenerateSignedPostPolicyV4 generates a PostPolicyV4 value from bucket, object and opts. +// The generated URL and fields will then allow an unauthenticated client to perform multipart uploads. +// +// This method only requires the Expires field in the specified PostPolicyV4Options +// to be non-nil. If not provided, it attempts to fill the GoogleAccessID and PrivateKey +// from the GOOGLE_APPLICATION_CREDENTIALS environment variable. +// If you are authenticating with a custom HTTP client, Service Account based +// auto-detection will be hindered. +// +// If no private key is found, it attempts to use the GoogleAccessID to sign the URL. +// This requires the IAM Service Account Credentials API to be enabled +// (https://console.developers.google.com/apis/api/iamcredentials.googleapis.com/overview) +// and iam.serviceAccounts.signBlob permissions on the GoogleAccessID service account. +// If you do not want these fields set for you, you may pass them in through opts or use +// GenerateSignedPostPolicyV4(bucket, name string, opts *PostPolicyV4Options) instead. +func (b *BucketHandle) GenerateSignedPostPolicyV4(object string, opts *PostPolicyV4Options) (*PostPolicyV4, error) { + if opts.GoogleAccessID != "" && (opts.SignRawBytes != nil || opts.SignBytes != nil || len(opts.PrivateKey) > 0) { + return GenerateSignedPostPolicyV4(b.name, object, opts) + } + // Make a copy of opts so we don't modify the pointer parameter. + newopts := opts.clone() + + if newopts.GoogleAccessID == "" { + id, err := b.detectDefaultGoogleAccessID() + if err != nil { + return nil, err + } + newopts.GoogleAccessID = id + } + if newopts.SignBytes == nil && newopts.SignRawBytes == nil && len(newopts.PrivateKey) == 0 { + if b.c.creds != nil && len(b.c.creds.JSON) > 0 { + var sa struct { + PrivateKey string `json:"private_key"` + } + err := json.Unmarshal(b.c.creds.JSON, &sa) + if err == nil && sa.PrivateKey != "" { + newopts.PrivateKey = []byte(sa.PrivateKey) + } + } + + // Don't error out if we can't unmarshal the private key from the client, + // fallback to the default sign function for the service account. + if len(newopts.PrivateKey) == 0 { + newopts.SignRawBytes = b.defaultSignBytesFunc(newopts.GoogleAccessID) + } + } + return GenerateSignedPostPolicyV4(b.name, object, newopts) +} func (b *BucketHandle) detectDefaultGoogleAccessID() (string, error) { returnErr := errors.New("no credentials found on client and not on GCE (Google Compute Engine)") @@ -1081,10 +1138,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error { metageneration = b.conds.MetagenerationMatch } req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration) - return runWithRetry(ctx, func() error { + return run(ctx, func() error { _, err := req.Context(ctx).Do() return err - }) + }, b.retry, true) } // applyBucketConds modifies the provided call using the conditions in conds. @@ -1367,6 +1424,21 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator { return it } +// Retryer returns a bucket handle that is configured with custom retry +// behavior as specified by the options that are passed to it. All operations +// on the new handle will use the customized retry configuration. +// Retry options set on a object handle will take precedence over options set on +// the bucket handle. +func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle { + b2 := *b + retry := &retryConfig{} + for _, opt := range opts { + opt.apply(retry) + } + b2.retry = retry + return &b2 +} + // An ObjectIterator is an iterator over ObjectAttrs. // // Note: This iterator is not safe for concurrent operations without explicit synchronization. diff --git a/storage/bucket_test.go b/storage/bucket_test.go index 0e65e8becdf..2b4491766fc 100644 --- a/storage/bucket_test.go +++ b/storage/bucket_test.go @@ -23,6 +23,7 @@ import ( "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" + gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" ) @@ -717,3 +718,90 @@ func TestNewBucket(t *testing.T) { t.Errorf("got=-, want=+:\n%s", diff) } } + +func TestBucketRetryer(t *testing.T) { + testCases := []struct { + name string + call func(b *BucketHandle) *BucketHandle + want *retryConfig + }{ + { + name: "all defaults", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer() + }, + want: &retryConfig{}, + }, + { + name: "set all options", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithBackoff(gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + }), + WithPolicy(RetryAlways), + WithErrorFunc(func(err error) bool { return false })) + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + }, + policy: RetryAlways, + shouldRetry: func(err error) bool { return false }, + }, + }, + { + name: "set some backoff options", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithBackoff(gax.Backoff{ + Multiplier: 3, + })) + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Multiplier: 3, + }}, + }, + { + name: "set policy only", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer(WithPolicy(RetryNever)) + }, + want: &retryConfig{ + policy: RetryNever, + }, + }, + { + name: "set ErrorFunc only", + call: func(b *BucketHandle) *BucketHandle { + return b.Retryer( + WithErrorFunc(func(err error) bool { return false })) + }, + want: &retryConfig{ + shouldRetry: func(err error) bool { return false }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(s *testing.T) { + b := tc.call(&BucketHandle{}) + if diff := cmp.Diff( + b.retry, + tc.want, + cmp.AllowUnexported(retryConfig{}, gax.Backoff{}), + // ErrorFunc cannot be compared directly, but we check if both are + // either nil or non-nil. + cmp.Comparer(func(a, b func(err error) bool) bool { + return (a == nil && b == nil) || (a != nil && b != nil) + }), + ); diff != "" { + s.Fatalf("retry not configured correctly: %v", diff) + } + }) + } +} diff --git a/storage/copy.go b/storage/copy.go index 61983df5ada..f180753520c 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -138,8 +138,11 @@ func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.Rewr var res *raw.RewriteResponse var err error setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { res, err = call.Do(); return err }) - if err != nil { + + retryCall := func() error { res, err = call.Do(); return err } + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + + if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { return nil, err } c.RewriteToken = res.RewriteToken @@ -230,8 +233,11 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { } var obj *raw.Object setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) - if err != nil { + + retryCall := func() error { obj, err = call.Do(); return err } + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + + if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { return nil, err } return newObject(obj), nil diff --git a/storage/go.mod b/storage/go.mod index dc5a4e8e795..61685fdd03a 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -7,10 +7,10 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/googleapis/gax-go/v2 v2.1.1 - golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 - google.golang.org/api v0.60.0 - google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 + google.golang.org/api v0.61.0 + google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 ) diff --git a/storage/go.sum b/storage/go.sum index cb0cc2607ea..7326151b7f1 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -286,8 +286,8 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= -golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -344,8 +344,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik= -golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -444,8 +444,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= -google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk= -google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4= +google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= +google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -510,9 +510,9 @@ google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211021150943-2b146023228c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 h1:b9mVrqYfq3P4bCdaLg1qtBnPzUYgglsIdjZkL/fQVOE= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12 h1:DN5b3HU13J4sMd/QjDx34U6afpaexKTDdop+26pdjdk= +google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/storage/integration_test.go b/storage/integration_test.go index 6d3567d0c2a..b0ebf509643 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -4142,7 +4142,7 @@ func TestIntegration_PostPolicyV4(t *testing.T) { object := b.Object(objectName) defer h.mustDeleteObject(object) - pv4, err := GenerateSignedPostPolicyV4(newBucketName, objectName, opts) + pv4, err := b.GenerateSignedPostPolicyV4(objectName, opts) if err != nil { t.Fatal(err) } @@ -4248,6 +4248,78 @@ func TestIntegration_SignedURL_Bucket(t *testing.T) { } } +func TestIntegration_PostPolicyV4_Bucket(t *testing.T) { + h := testHelper{t} + ctx := context.Background() + + if testing.Short() && !replaying { + t.Skip("Integration tests skipped in short mode") + } + + // We explictly send the key to the client to sign with the private key + clientWithCredentials := newTestClientWithExplicitCredentials(ctx, t) + defer clientWithCredentials.Close() + + // Create another client to test the sign byte function as well + clientWithoutPrivateKey := testConfig(ctx, t, ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform") + defer clientWithoutPrivateKey.Close() + + jwt, err := testutil.JWTConfig() + if err != nil { + t.Fatalf("unable to find test credentials: %v", err) + } + + statusCodeToRespond := 200 + + for _, test := range []struct { + desc string + opts PostPolicyV4Options + client *Client + }{ + { + desc: "signing with the private key", + opts: PostPolicyV4Options{ + Expires: time.Now().Add(30 * time.Minute), + + Fields: &PolicyV4Fields{ + StatusCodeOnSuccess: statusCodeToRespond, + ContentType: "text/plain", + ACL: "public-read", + }, + }, + client: clientWithCredentials, + }, + { + desc: "signing with the default sign bytes func", + opts: PostPolicyV4Options{ + Expires: time.Now().Add(30 * time.Minute), + GoogleAccessID: jwt.Email, + Fields: &PolicyV4Fields{ + StatusCodeOnSuccess: statusCodeToRespond, + ContentType: "text/plain", + ACL: "public-read", + }, + }, + client: clientWithoutPrivateKey, + }, + } { + t.Run(test.desc, func(t *testing.T) { + objectName := uidSpace.New() + object := test.client.Bucket(bucketName).Object(objectName) + defer h.mustDeleteObject(object) + + pv4, err := test.client.Bucket(bucketName).GenerateSignedPostPolicyV4(objectName, &test.opts) + if err != nil { + t.Fatal(err) + } + + if err := verifyPostPolicy(pv4, object, bytes.Repeat([]byte("a"), 25), statusCodeToRespond); err != nil { + t.Fatal(err) + } + }) + } +} + // Tests that the same SignBytes function works for both // SignRawBytes on GeneratePostPolicyV4 and SignBytes on SignedURL func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) { @@ -4263,7 +4335,7 @@ func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) { h := testHelper{t} projectID := testutil.ProjID() bucketName := uidSpace.New() - objectName := "my-object.txt" + objectName := uidSpace.New() fileBody := bytes.Repeat([]byte("b"), 25) bucket := client.Bucket(bucketName) @@ -4491,7 +4563,7 @@ func (h testHelper) mustObjectAttrs(o *ObjectHandle) *ObjectAttrs { func (h testHelper) mustDeleteObject(o *ObjectHandle) { if err := o.Delete(context.Background()); err != nil { - h.t.Fatalf("%s: object delete: %v", loc(), err) + h.t.Fatalf("%s: delete object %s from bucket %s: %v", loc(), o.ObjectName(), o.BucketName(), err) } } diff --git a/storage/post_policy_v4.go b/storage/post_policy_v4.go index 5f418c3246b..7e972101015 100644 --- a/storage/post_policy_v4.go +++ b/storage/post_policy_v4.go @@ -116,6 +116,21 @@ type PostPolicyV4Options struct { shouldHashSignBytes bool } +func (opts *PostPolicyV4Options) clone() *PostPolicyV4Options { + return &PostPolicyV4Options{ + GoogleAccessID: opts.GoogleAccessID, + PrivateKey: opts.PrivateKey, + SignBytes: opts.SignBytes, + SignRawBytes: opts.SignRawBytes, + Expires: opts.Expires, + Style: opts.Style, + Insecure: opts.Insecure, + Fields: opts.Fields, + Conditions: opts.Conditions, + shouldHashSignBytes: opts.shouldHashSignBytes, + } +} + // PolicyV4Fields describes the attributes for a PostPolicyV4 request. type PolicyV4Fields struct { // ACL specifies the access control permissions for the object. diff --git a/storage/storage.go b/storage/storage.go index 798dd4598eb..07ba64cc9e9 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1790,9 +1790,18 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool // Retryer returns an object handle that is configured with custom retry // behavior as specified by the options that are passed to it. All operations // on the new handle will use the customized retry configuration. +// These retry options will merge with the bucket's retryer (if set) for the +// returned handle. Options passed into this method will take precedence over +// options on the bucket's retryer. func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle { o2 := *o - retry := &retryConfig{} + var retry *retryConfig + if o.retry != nil { + // merge the options with the existing retry + retry = o.retry + } else { + retry = &retryConfig{} + } for _, opt := range opts { opt.apply(retry) } @@ -1897,6 +1906,27 @@ type retryConfig struct { shouldRetry func(err error) bool } +func (r *retryConfig) clone() *retryConfig { + if r == nil { + return nil + } + + var bo *gax.Backoff + if r.backoff != nil { + bo = &gax.Backoff{ + Initial: r.backoff.Initial, + Max: r.backoff.Max, + Multiplier: r.backoff.Multiplier, + } + } + + return &retryConfig{ + backoff: bo, + policy: r.policy, + shouldRetry: r.shouldRetry, + } +} + // composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods // that modifyCall searches for by name. type composeSourceObj struct { diff --git a/storage/storage_test.go b/storage/storage_test.go index dafc9c3d22c..b4f7a4ef429 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -788,7 +788,7 @@ func TestConditionErrors(t *testing.T) { // Test that ObjectHandle.Retryer correctly configures the retry configuration // in the ObjectHandle. -func TestRetryer(t *testing.T) { +func TestObjectRetryer(t *testing.T) { testCases := []struct { name string call func(o *ObjectHandle) *ObjectHandle @@ -875,6 +875,154 @@ func TestRetryer(t *testing.T) { } } +// Test the interactions between ObjectHandle and BucketHandle Retryers and that +// they correctly configure the retry configuration +func TestRetryer(t *testing.T) { + testCases := []struct { + name string + bucketOptions []RetryOption + objectOptions []RetryOption + want *retryConfig + }{ + { + name: "no retries", + want: nil, + }, + { + name: "object retryer configures retry", + objectOptions: []RetryOption{ + WithPolicy(RetryAlways), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + shouldRetry: shouldRetry, + policy: RetryAlways, + }, + }, + { + name: "bucket retryer configures retry", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }), + WithPolicy(RetryAlways), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }, + shouldRetry: shouldRetry, + policy: RetryAlways, + }, + }, + { + name: "object retryer overrides bucket retryer", + bucketOptions: []RetryOption{ + WithPolicy(RetryAlways), + }, + objectOptions: []RetryOption{ + WithPolicy(RetryNever), + WithErrorFunc(shouldRetry), + }, + want: &retryConfig{ + policy: RetryNever, + shouldRetry: shouldRetry, + }, + }, + { + name: "object retryer overrides bucket retryer backoff options", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Minute, + Max: time.Hour, + Multiplier: 6, + }), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Microsecond, + }), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Microsecond, + }, + }, + }, + { + name: "object retryer does not override bucket retryer if option is not set", + bucketOptions: []RetryOption{ + WithPolicy(RetryNever), + WithErrorFunc(shouldRetry), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Second, + }), + }, + want: &retryConfig{ + policy: RetryNever, + shouldRetry: shouldRetry, + backoff: &gax.Backoff{ + Initial: time.Nanosecond, + Max: time.Second, + }, + }, + }, + { + name: "object's backoff completely overwrites bucket's backoff", + bucketOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Initial: time.Hour, + }), + }, + objectOptions: []RetryOption{ + WithBackoff(gax.Backoff{ + Multiplier: 4, + }), + }, + want: &retryConfig{ + backoff: &gax.Backoff{ + Multiplier: 4, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(s *testing.T) { + b := &BucketHandle{} + if len(tc.bucketOptions) > 0 { + b = b.Retryer(tc.bucketOptions...) + } + o := b.Object("obj") + if len(tc.objectOptions) > 0 { + o = o.Retryer(tc.objectOptions...) + } + + if diff := cmp.Diff( + o.retry, + tc.want, + cmp.AllowUnexported(retryConfig{}, gax.Backoff{}), + // ErrorFunc cannot be compared directly, but we check if both are + // either nil or non-nil. + cmp.Comparer(func(a, b func(err error) bool) bool { + return (a == nil && b == nil) || (a != nil && b != nil) + }), + ); diff != "" { + s.Fatalf("retry not configured correctly: %v", diff) + } + }) + } +} + // Test object compose. func TestObjectCompose(t *testing.T) { t.Parallel()