Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): improve error communication #6360

Merged
merged 8 commits into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions bigquery/go.mod
Expand Up @@ -9,14 +9,14 @@ require (
cloud.google.com/go/storage v1.23.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.8
github.com/googleapis/gax-go/v2 v2.4.0
github.com/googleapis/gax-go/v2 v2.5.1
go.opencensus.io v0.23.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f
google.golang.org/api v0.90.0
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
)

require (
Expand Down
7 changes: 5 additions & 2 deletions bigquery/go.sum
Expand Up @@ -181,8 +181,9 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/OthfcblKl4IGNaM=
github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99EXz9pXxye9YM=
github.com/googleapis/gax-go/v2 v2.4.0 h1:dS9eYAjhrE2RjmzYw2XAPvcXfmcQLtFEQWn0CR82awk=
github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c=
github.com/googleapis/gax-go/v2 v2.5.1 h1:kBRZU0PSuI7PspsSb/ChWoVResUcwNVIdpB049pKTiw=
github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo=
github.com/googleapis/go-type-adapters v1.0.0 h1:9XdMn+d/G57qq1s8dNc5IesGCXHf6V2HZ2JwRxfA2tA=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down Expand Up @@ -604,6 +605,7 @@ google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljW
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 h1:QntLWYqZeuBtJkth3m/6DLznnI0AHJr+AgJXvVh/izw=
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down Expand Up @@ -653,8 +655,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
Expand Down
28 changes: 28 additions & 0 deletions bigquery/storage/managedwriter/doc.go
Expand Up @@ -170,5 +170,33 @@ have been finalized, meaning they'll no longer allow further data writes.
// Using the client, we can commit data from multple streams to the same
// table atomically.
resp, err := client.BatchCommitWriteStreams(ctx, req)

# Error Handling

Like other Google Cloud services, this API relies on common components that can provide an
enhanced set of errors when communicating about the results of API interactions.

Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror)
provides convenience methods for extracting structured information about errors.

Additionally, the BigQuery Storage API will augment applicable errors with additional
service-specific details in the form of a StorageError message. Please note that despite the
name, this protocol buffer message does not implement Go's error interface. By leveraging
ExtractProtoMessage, which is provided as part of the apierror packed, you can retrieve
service-specific details.

quartzmo marked this conversation as resolved.
Show resolved Hide resolved
// By way of example, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if err != nil {
if apiErr, ok := apierror.FromError(err); ok {
// We now have an instance of APIError, which directly exposes more specific
// details about multiple failure conditions include transport-level errors.
storageErr := &storagepb.StorageError{}
if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil {
// storageErr now contains service-specific information about the error.
log.Printf("Received service-specific error code %s", storageErr.GetCode().String())
}
}
}
*/
package managedwriter
122 changes: 122 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -139,6 +139,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("ErrorBehaviors", func(t *testing.T) {
t.Parallel()
testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -404,6 +408,124 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(int64(len(testSimpleData))))
}

// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

data := make([][]byte, len(testSimpleData))
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data[k] = b
}

// Send an append at an invalid offset.
result, err := ms.AppendRows(ctx, data, WithOffset(99))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
//
off, err := result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}

apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se := &storagepb.StorageError{}
e := apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Send "real" append to advance the offset.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err != nil {
t.Errorf("expected offset, got error %v", err)
}
wantOffset := int64(0)
if off != wantOffset {
t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
}
// Now, send at the start offset again.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Finalize the stream.
if _, err := ms.Finalize(ctx); err != nil {
t.Errorf("Finalize had error: %v", err)
}
// Send another append, which is disallowed for finalized streams.
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_STREAM_FINALIZED
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down