Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add trace instrumentation support for individual rpcs #6493

Merged
merged 12 commits into from Sep 2, 2022
5 changes: 5 additions & 0 deletions bigquery/bigquery.go
Expand Up @@ -26,6 +26,7 @@ import (
"cloud.google.com/go/bigquery/internal"
cloudinternal "cloud.google.com/go/internal"
"cloud.google.com/go/internal/detect"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
bq "google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -119,7 +120,9 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*
var res *bq.Job
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}
// A job with a client-generated ID can be retried; the presence of the
Expand Down Expand Up @@ -149,7 +152,9 @@ func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*
var res *bq.QueryResponse
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}

Expand Down
19 changes: 18 additions & 1 deletion bigquery/dataset.go
Expand Up @@ -216,7 +216,12 @@ func (d *Dataset) deleteInternal(ctx context.Context, deleteContents bool) (err

call := d.c.bqs.Datasets.Delete(d.ProjectID, d.DatasetID).Context(ctx).DeleteContents(deleteContents)
setClientHeader(call.Header())
return call.Do()
return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}

// Metadata fetches the metadata for the dataset.
Expand All @@ -228,7 +233,9 @@ func (d *Dataset) Metadata(ctx context.Context) (md *DatasetMetadata, err error)
setClientHeader(call.Header())
var ds *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.get")
ds, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -284,7 +291,9 @@ func (d *Dataset) Update(ctx context.Context, dm DatasetMetadataToUpdate, etag s
}
var ds2 *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.patch")
ds2, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -391,7 +400,9 @@ var listTables = func(it *TableIterator, pageSize int, pageToken string) (*bq.Ta
}
var res *bq.TableList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.tables.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -476,7 +487,9 @@ var listModels = func(it *ModelIterator, pageSize int, pageToken string) (*bq.Li
}
var res *bq.ListModelsResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.models.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -563,7 +576,9 @@ var listRoutines = func(it *RoutineIterator, pageSize int, pageToken string) (*b
}
var res *bq.ListRoutinesResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.routines.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -667,7 +682,9 @@ var listDatasets = func(it *DatasetIterator, pageSize int, pageToken string) (*b
}
var res *bq.DatasetList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.datasets.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down
2 changes: 2 additions & 0 deletions bigquery/inserter.go
Expand Up @@ -182,7 +182,9 @@ func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
setClientHeader(call.Header())
var res *bq.TableDataInsertAllResponse
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion bigquery/job.go
Expand Up @@ -240,7 +240,9 @@ func (j *Job) Cancel(ctx context.Context) error {
Context(ctx)
setClientHeader(call.Header())
return runWithRetry(ctx, func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
_, err := call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand All @@ -257,7 +259,9 @@ func (j *Job) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand Down Expand Up @@ -343,7 +347,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
}
var res *bq.GetQueryResultsResponse
err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
res, err = call.Do()
trace.EndSpan(sCtx, err)
if err != nil {
return !retryableError(err, jobRetryReasons), err
}
Expand Down Expand Up @@ -837,7 +843,14 @@ func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
if it.ParentJobID != "" {
req.ParentJobId(it.ParentJobID)
}
res, err := req.Do()
var res *bq.JobList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
res, err = req.Do()
trace.EndSpan(sCtx, err)
return err
})

if err != nil {
return "", err
}
Expand Down Expand Up @@ -870,7 +883,9 @@ func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID
}
setClientHeader(call.Header())
err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
job, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions bigquery/model.go
Expand Up @@ -86,7 +86,9 @@ func (m *Model) Metadata(ctx context.Context) (mm *ModelMetadata, err error) {
setClientHeader(req.Header())
var model *bq.Model
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.get")
model, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand All @@ -111,7 +113,9 @@ func (m *Model) Update(ctx context.Context, mm ModelMetadataToUpdate, etag strin
}
var res *bq.Model
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.patch")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions bigquery/routine.go
Expand Up @@ -97,7 +97,9 @@ func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error)
setClientHeader(req.Header())
var routine *bq.Routine
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.get")
routine, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down Expand Up @@ -129,7 +131,9 @@ func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag
}
var res *bq.Routine
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.update")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions bigquery/table.go
Expand Up @@ -581,7 +581,9 @@ func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) {
req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
setClientHeader(req.Header())
return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.insert")
_, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -716,7 +718,9 @@ func (t *Table) Metadata(ctx context.Context, opts ...TableMetadataOption) (md *
setClientHeader(tgc.call.Header())
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.tables.get")
res, err = tgc.call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -783,7 +787,9 @@ func (t *Table) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.delete")
err = call.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -841,7 +847,9 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
}
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.patch")
res, err = tpc.call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
99 changes: 99 additions & 0 deletions bigquery/trace_integration_test.go
@@ -0,0 +1,99 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
"context"
"strings"
"testing"
"time"

"go.opencensus.io/trace"
)

// testExporter is a testing exporter for validating captured spans.
type testExporter struct {
spans []*trace.SpanData
}

func (te *testExporter) ExportSpan(s *trace.SpanData) {
te.spans = append(te.spans, s)
}

// hasSpans checks that the exporter has all the span names
// specified in the slice. It returns the unmatched names.
func (te *testExporter) hasSpans(names []string) []string {
matches := make(map[string]struct{})
for _, n := range names {
matches[n] = struct{}{}
}
for _, s := range te.spans {
delete(matches, s.Name)
}
var unmatched []string
for k := range matches {
unmatched = append(unmatched, k)
}
return unmatched
}

func TestIntegration_Tracing(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}

ctx := context.Background()

for _, tc := range []struct {
description string
callF func(ctx context.Context)
wantSpans []string
}{
{
description: "fast path query",
callF: func(ctx context.Context) {
client.Query("SELECT SESSION_USER()").Read(ctx)
},
wantSpans: []string{"bigquery.jobs.query", "cloud.google.com/go/bigquery.Query.Run"},
},
{
description: "slow path query",
callF: func(ctx context.Context) {
q := client.Query("SELECT SESSION_USER()")
q.JobTimeout = time.Hour
q.Read(ctx)
},
wantSpans: []string{"bigquery.jobs.insert", "bigquery.jobs.getQueryResults", "cloud.google.com/go/bigquery.Job.Read", "cloud.google.com/go/bigquery.Query.Run"},
},
{
description: "table metadata",
callF: func(ctx context.Context) {
client.DatasetInProject("bigquery-public-data", "samples").Table("shakespeare").Metadata(ctx)
},
wantSpans: []string{"bigquery.tables.get", "cloud.google.com/go/bigquery.Table.Metadata"},
},
} {
exporter := &testExporter{}
trace.RegisterExporter(exporter)
traceCtx, span := trace.StartSpan(ctx, "testspan", trace.WithSampler(trace.AlwaysSample()))
tc.callF(traceCtx)
span.End()
trace.UnregisterExporter(exporter)

if unmatched := exporter.hasSpans(tc.wantSpans); len(unmatched) > 0 {
t.Errorf("case (%s): unmatched spans: %s", tc.description, strings.Join(unmatched, ","))
}
}
}