Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): more metrics instrumentation #4690

Merged
merged 8 commits into from Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 74 additions & 21 deletions bigquery/storage/managedwriter/instrumentation.go
Expand Up @@ -31,69 +31,122 @@ var (
// We allow users to annotate streams with a data origin for monitoring purposes.
// See the WithDataOrigin writer option for providing this.
keyDataOrigin = tag.MustNewKey("dataOrigin")

// keyError tags metrics using the status code of returned errors.
keyError = tag.MustNewKey("error")
)

// DefaultOpenCensusViews retains the set of all opencensus views that this
// library has instrumented, to add view registration for exporters.
var DefaultOpenCensusViews []*view.View

const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"

var (
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)

// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)

// AppendRequests is a measure of the number of append requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)

// AppendBytes is a measure of the bytes sent as append requests.
// AppendRequestBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)

// AppendRequestErrors is a measure of the number of append requests that errored on send.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)

// AppendRequestRows is a measure of the number of append rows sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)

// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)

// AppendResponseErrors is a measure of the number of append responses received with an error attached.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)

// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
)

// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
var (

// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
AppendClientOpenView *view.View

// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
)
AppendClientOpenRetryView *view.View

var (
// AppendRequestsView is a cumulative sum of AppendRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestsView *view.View

// AppendBytesView is a cumulative sum of AppendBytes.
// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendBytesView *view.View
AppendRequestBytesView *view.View

// AppendResponsesView is a cumulative sum of AppendResponses.
// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponsesView *view.View
AppendRequestErrorsView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// AppendRequestRowsView is a cumulative sum of AppendRows.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
AppendRequestRowsView *view.View

// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// AppendResponsesView is a cumulative sum of AppendResponses.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenView *view.View
AppendResponsesView *view.View

// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryView *view.View
AppendResponseErrorsView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
)

func init() {
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)

AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)

AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)

FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)

DefaultOpenCensusViews = []*view.View{
AppendClientOpenView,
AppendClientOpenRetryView,

AppendRequestsView,
AppendRequestBytesView,
AppendRequestErrorsView,
AppendRequestRowsView,

AppendResponsesView,
AppendResponseErrorsView,

FlushRequestsView,
}
}

func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
Expand Down
14 changes: 14 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -21,7 +21,9 @@ import (
"sync"

"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -269,7 +271,14 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error
err = (*arc).Send(req)
}
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())))
if err != nil {
status := grpcstatus.Convert(err)
if status != nil {
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
}
bo, shouldRetry := r.Retry(err)
if shouldRetry {
if err := gax.Sleep(ms.ctx, bo); err != nil {
Expand Down Expand Up @@ -366,6 +375,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
recordStat(ctx, AppendResponses, 1)

if status := resp.GetError(); status != nil {
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
if err != nil {
tagCtx = ctx
}
recordStat(tagCtx, AppendResponseErrors, 1)
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
continue
}
Expand Down