Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): improve error communication #6360

Merged
merged 8 commits into from Aug 11, 2022
101 changes: 62 additions & 39 deletions bigquery/storage/managedwriter/doc.go
Expand Up @@ -24,8 +24,7 @@ feature-rich successor to the classic BigQuery streaming interface, which is pre
in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST
methods.


Creating a Client
# Creating a Client

To start working with this package, create a client:

Expand All @@ -35,8 +34,7 @@ To start working with this package, create a client:
// TODO: Handle error.
}


Defining the Protocol Buffer Schema
# Defining the Protocol Buffer Schema

The write functionality of BigQuery Storage requires data to be sent using encoded
protocol buffer messages using proto2 wire format. As the protocol buffer is not
Expand Down Expand Up @@ -70,7 +68,7 @@ contains functionality to normalize the descriptor into a self-contained definit
The adapt subpackage also contains functionality for generating a DescriptorProto using
a BigQuery table's schema directly.

Constructing a ManagedStream
# Constructing a ManagedStream

The ManagedStream handles management of the underlying write connection to the BigQuery
Storage service. You can either create a write session explicitly and pass it in, or
Expand Down Expand Up @@ -102,7 +100,7 @@ In addition, NewManagedStream can create new streams implicitly:
// TODO: Handle error.
}

Writing Data
# Writing Data

Use the AppendRows function to write one or more serialized proto messages to a stream. You
can choose to specify an offset in the stream to handle de-duplication for user-created streams,
Expand All @@ -111,42 +109,40 @@ but a "default" stream neither accepts nor reports offsets.
AppendRows returns a future-like object that blocks until the write is successful or yields
an error.

// Define a couple of messages.
mesgs := []*myprotopackage.MyCompiledMessage{
{
UserName: proto.String("johndoe"),
EmailAddress: proto.String("jd@mycompany.mydomain",
FavoriteNumbers: []proto.Int64{1,42,12345},
},
{
UserName: proto.String("janesmith"),
EmailAddress: proto.String("smith@othercompany.otherdomain",
FavoriteNumbers: []proto.Int64{1,3,5,7,9},
},
}

// Define a couple of messages.
mesgs := []*myprotopackage.MyCompiledMessage{
{
UserName: proto.String("johndoe"),
EmailAddress: proto.String("jd@mycompany.mydomain",
FavoriteNumbers: []proto.Int64{1,42,12345},
},
{
UserName: proto.String("janesmith"),
EmailAddress: proto.String("smith@othercompany.otherdomain",
FavoriteNumbers: []proto.Int64{1,3,5,7,9},
},
}

// Encode the messages into binary format.
encoded := make([][]byte, len(mesgs))
for k, v := range mesgs{
b, err := proto.Marshal(v)
// Encode the messages into binary format.
encoded := make([][]byte, len(mesgs))
for k, v := range mesgs{
b, err := proto.Marshal(v)
if err != nil {
// TODO: Handle error.
}
encoded[k] = b
}

// Send the rows to the service, and specify an offset for managing deduplication.
result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0))

// Block until the write is complete and return the result.
returnedOffset, err := result.GetResult(ctx)
if err != nil {
// TODO: Handle error.
}
encoded[k] = b
}

// Send the rows to the service, and specify an offset for managing deduplication.
result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0))

// Block until the write is complete and return the result.
returnedOffset, err := result.GetResult(ctx)
if err != nil {
// TODO: Handle error.
}


Buffered Stream Management
# Buffered Stream Management

For Buffered streams, users control when data is made visible in the destination table/stream
independently of when it is written. Use FlushRows on the ManagedStream to advance the flush
Expand All @@ -156,12 +152,11 @@ point ahead in the stream.
// ahead to make the first 1000 rows available.
flushOffset, err := managedStream.FlushRows(ctx, 1000)

Pending Stream Management
# Pending Stream Management

Pending streams allow users to commit data from multiple streams together once the streams
have been finalized, meaning they'll no longer allow further data writes.


// First, finalize the stream we're writing into.
totalRows, err := managedStream.Finalize(ctx)
if err != nil {
Expand All @@ -176,5 +171,33 @@ have been finalized, meaning they'll no longer allow further data writes.
// table atomically.
resp, err := client.BatchCommitWriteStreams(ctx, req)

# Error Handling

Like other Google Cloud services, this API relies on common components that can provide an
enhanced set of errors when communicating about the results of API interactions.

Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror)
provides convenience methods for extracting structured information about errors.

// By way of example, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if err != nil {
if apiErr, ok := apierror.FromError(err); ok {
// We now have an instance of APIError, which directly exposes more specific
// details about multiple failure conditions.
log.Printf("result had status %v", apiErr.GRPCStatus())
}
}

Additionally, the service defines a specific StorageError type that has service-specific details
about errors. Please note that StorageError does not implement Go's error interface. The
StorageErrorFromError() function can be used to extract a StorageError from an error returned
by the service.

// Once again, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if se = StorageErrorFromError(err); se != nil {
log.Printf("storage error code was %s, message was %s, se.GetCode().String(), se.GetErrorMessage())
}
*/
package managedwriter
106 changes: 106 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -140,6 +140,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("ErrorBehaviors", func(t *testing.T) {
t.Parallel()
testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -405,6 +409,108 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(int64(len(testSimpleData))))
}

// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

data := make([][]byte, len(testSimpleData))
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data[k] = b
}

// Send an append at an invalid offset.
result, err := ms.AppendRows(ctx, data, WithOffset(99))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
//
off, err := result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se := StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Send "real" append to advance the offset.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err != nil {
t.Errorf("expected offset, got error %v", err)
}
wantOffset := int64(0)
if off != wantOffset {
t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
}
// Now, send at the start offset again.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se = StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Finalize the stream.
if _, err := ms.Finalize(ctx); err != nil {
t.Errorf("Finalize had error: %v", err)
}
// Send another append, which is disallowed for finalized streams.
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se = StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode = storagepb.StorageError_STREAM_FINALIZED
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions bigquery/storage/managedwriter/storageerror.go
@@ -0,0 +1,34 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/status"
)

// StorageErrorFromError attempts to extract a storage error if it is present.
func StorageErrorFromError(err error) *storagepb.StorageError {
status, ok := status.FromError(err)
if !ok {
return nil
}
for _, detail := range status.Details() {
if se, ok := detail.(*storagepb.StorageError); ok {
return se
}
}
return nil
}