diff --git a/bigquery/bigquery.go b/bigquery/bigquery.go index 34a39e524bc..9b473572223 100644 --- a/bigquery/bigquery.go +++ b/bigquery/bigquery.go @@ -26,6 +26,7 @@ import ( "cloud.google.com/go/bigquery/internal" cloudinternal "cloud.google.com/go/internal" "cloud.google.com/go/internal/detect" + "cloud.google.com/go/internal/trace" "cloud.google.com/go/internal/version" gax "github.com/googleapis/gax-go/v2" bq "google.golang.org/api/bigquery/v2" @@ -119,7 +120,9 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (* var res *bq.Job var err error invoke := func() error { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert") res, err = call.Do() + trace.EndSpan(sCtx, err) return err } // A job with a client-generated ID can be retried; the presence of the @@ -149,7 +152,9 @@ func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (* var res *bq.QueryResponse var err error invoke := func() error { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.query") res, err = call.Do() + trace.EndSpan(sCtx, err) return err } diff --git a/bigquery/dataset.go b/bigquery/dataset.go index a9deeb3295e..d6bb60dc265 100644 --- a/bigquery/dataset.go +++ b/bigquery/dataset.go @@ -216,7 +216,12 @@ func (d *Dataset) deleteInternal(ctx context.Context, deleteContents bool) (err call := d.c.bqs.Datasets.Delete(d.ProjectID, d.DatasetID).Context(ctx).DeleteContents(deleteContents) setClientHeader(call.Header()) - return call.Do() + return runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.datasets.delete") + err = call.Do() + trace.EndSpan(sCtx, err) + return err + }) } // Metadata fetches the metadata for the dataset. @@ -228,7 +233,9 @@ func (d *Dataset) Metadata(ctx context.Context) (md *DatasetMetadata, err error) setClientHeader(call.Header()) var ds *bq.Dataset if err := runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.datasets.get") ds, err = call.Do() + trace.EndSpan(sCtx, err) return err }); err != nil { return nil, err @@ -284,7 +291,9 @@ func (d *Dataset) Update(ctx context.Context, dm DatasetMetadataToUpdate, etag s } var ds2 *bq.Dataset if err := runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.datasets.patch") ds2, err = call.Do() + trace.EndSpan(sCtx, err) return err }); err != nil { return nil, err @@ -391,7 +400,9 @@ var listTables = func(it *TableIterator, pageSize int, pageToken string) (*bq.Ta } var res *bq.TableList err := runWithRetry(it.ctx, func() (err error) { + sCtx := trace.StartSpan(it.ctx, "bigquery.tables.list") res, err = call.Do() + trace.EndSpan(sCtx, err) return err }) return res, err @@ -476,7 +487,9 @@ var listModels = func(it *ModelIterator, pageSize int, pageToken string) (*bq.Li } var res *bq.ListModelsResponse err := runWithRetry(it.ctx, func() (err error) { + sCtx := trace.StartSpan(it.ctx, "bigquery.models.list") res, err = call.Do() + trace.EndSpan(sCtx, err) return err }) return res, err @@ -563,7 +576,9 @@ var listRoutines = func(it *RoutineIterator, pageSize int, pageToken string) (*b } var res *bq.ListRoutinesResponse err := runWithRetry(it.ctx, func() (err error) { + sCtx := trace.StartSpan(it.ctx, "bigquery.routines.list") res, err = call.Do() + trace.EndSpan(sCtx, err) return err }) return res, err @@ -667,7 +682,9 @@ var listDatasets = func(it *DatasetIterator, pageSize int, pageToken string) (*b } var res *bq.DatasetList err := runWithRetry(it.ctx, func() (err error) { + sCtx := trace.StartSpan(it.ctx, "bigquery.datasets.list") res, err = call.Do() + trace.EndSpan(sCtx, err) return err }) return res, err diff --git a/bigquery/inserter.go b/bigquery/inserter.go index 7fce5515317..f0ede5be0a9 100644 --- a/bigquery/inserter.go +++ b/bigquery/inserter.go @@ -182,7 +182,9 @@ func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error { setClientHeader(call.Header()) var res *bq.TableDataInsertAllResponse err = runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll") res, err = call.Do() + trace.EndSpan(ctx, err) return err }) if err != nil { diff --git a/bigquery/job.go b/bigquery/job.go index 52f3b63876c..d761650fed8 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -240,7 +240,9 @@ func (j *Job) Cancel(ctx context.Context) error { Context(ctx) setClientHeader(call.Header()) return runWithRetry(ctx, func() error { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel") _, err := call.Do() + trace.EndSpan(sCtx, err) return err }) } @@ -257,7 +259,9 @@ func (j *Job) Delete(ctx context.Context) (err error) { setClientHeader(call.Header()) return runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete") err = call.Do() + trace.EndSpan(sCtx, err) return err }) } @@ -343,7 +347,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6 } var res *bq.GetQueryResultsResponse err := internal.Retry(ctx, backoff, func() (stop bool, err error) { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults") res, err = call.Do() + trace.EndSpan(sCtx, err) if err != nil { return !retryableError(err, jobRetryReasons), err } @@ -837,7 +843,14 @@ func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) { if it.ParentJobID != "" { req.ParentJobId(it.ParentJobID) } - res, err := req.Do() + var res *bq.JobList + err := runWithRetry(it.ctx, func() (err error) { + sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list") + res, err = req.Do() + trace.EndSpan(sCtx, err) + return err + }) + if err != nil { return "", err } @@ -870,7 +883,9 @@ func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID } setClientHeader(call.Header()) err := runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.jobs.get") job, err = call.Do() + trace.EndSpan(sCtx, err) return err }) if err != nil { diff --git a/bigquery/model.go b/bigquery/model.go index 56260efe9c3..7fb90493e9b 100644 --- a/bigquery/model.go +++ b/bigquery/model.go @@ -86,7 +86,9 @@ func (m *Model) Metadata(ctx context.Context) (mm *ModelMetadata, err error) { setClientHeader(req.Header()) var model *bq.Model err = runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.models.get") model, err = req.Do() + trace.EndSpan(ctx, err) return err }) if err != nil { @@ -111,7 +113,9 @@ func (m *Model) Update(ctx context.Context, mm ModelMetadataToUpdate, etag strin } var res *bq.Model if err := runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.models.patch") res, err = call.Do() + trace.EndSpan(ctx, err) return err }); err != nil { return nil, err diff --git a/bigquery/routine.go b/bigquery/routine.go index d6aba1e0635..2185a146897 100644 --- a/bigquery/routine.go +++ b/bigquery/routine.go @@ -97,7 +97,9 @@ func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error) setClientHeader(req.Header()) var routine *bq.Routine err = runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.routines.get") routine, err = req.Do() + trace.EndSpan(ctx, err) return err }) if err != nil { @@ -129,7 +131,9 @@ func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag } var res *bq.Routine if err := runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.routines.update") res, err = call.Do() + trace.EndSpan(ctx, err) return err }); err != nil { return nil, err diff --git a/bigquery/table.go b/bigquery/table.go index 5ff66f39f21..3bfad2344e7 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -581,7 +581,9 @@ func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) { req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx) setClientHeader(req.Header()) return runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.tables.insert") _, err = req.Do() + trace.EndSpan(ctx, err) return err }) } @@ -716,7 +718,9 @@ func (t *Table) Metadata(ctx context.Context, opts ...TableMetadataOption) (md * setClientHeader(tgc.call.Header()) var res *bq.Table if err := runWithRetry(ctx, func() (err error) { + sCtx := trace.StartSpan(ctx, "bigquery.tables.get") res, err = tgc.call.Do() + trace.EndSpan(sCtx, err) return err }); err != nil { return nil, err @@ -783,7 +787,9 @@ func (t *Table) Delete(ctx context.Context) (err error) { setClientHeader(call.Header()) return runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.tables.delete") err = call.Do() + trace.EndSpan(ctx, err) return err }) } @@ -841,7 +847,9 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin } var res *bq.Table if err := runWithRetry(ctx, func() (err error) { + ctx = trace.StartSpan(ctx, "bigquery.tables.patch") res, err = tpc.call.Do() + trace.EndSpan(ctx, err) return err }); err != nil { return nil, err diff --git a/bigquery/trace_integration_test.go b/bigquery/trace_integration_test.go new file mode 100644 index 00000000000..e3a3aaf00ca --- /dev/null +++ b/bigquery/trace_integration_test.go @@ -0,0 +1,99 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "strings" + "testing" + "time" + + "go.opencensus.io/trace" +) + +// testExporter is a testing exporter for validating captured spans. +type testExporter struct { + spans []*trace.SpanData +} + +func (te *testExporter) ExportSpan(s *trace.SpanData) { + te.spans = append(te.spans, s) +} + +// hasSpans checks that the exporter has all the span names +// specified in the slice. It returns the unmatched names. +func (te *testExporter) hasSpans(names []string) []string { + matches := make(map[string]struct{}) + for _, n := range names { + matches[n] = struct{}{} + } + for _, s := range te.spans { + delete(matches, s.Name) + } + var unmatched []string + for k := range matches { + unmatched = append(unmatched, k) + } + return unmatched +} + +func TestIntegration_Tracing(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + + ctx := context.Background() + + for _, tc := range []struct { + description string + callF func(ctx context.Context) + wantSpans []string + }{ + { + description: "fast path query", + callF: func(ctx context.Context) { + client.Query("SELECT SESSION_USER()").Read(ctx) + }, + wantSpans: []string{"bigquery.jobs.query", "cloud.google.com/go/bigquery.Query.Run"}, + }, + { + description: "slow path query", + callF: func(ctx context.Context) { + q := client.Query("SELECT SESSION_USER()") + q.JobTimeout = time.Hour + q.Read(ctx) + }, + wantSpans: []string{"bigquery.jobs.insert", "bigquery.jobs.getQueryResults", "cloud.google.com/go/bigquery.Job.Read", "cloud.google.com/go/bigquery.Query.Run"}, + }, + { + description: "table metadata", + callF: func(ctx context.Context) { + client.DatasetInProject("bigquery-public-data", "samples").Table("shakespeare").Metadata(ctx) + }, + wantSpans: []string{"bigquery.tables.get", "cloud.google.com/go/bigquery.Table.Metadata"}, + }, + } { + exporter := &testExporter{} + trace.RegisterExporter(exporter) + traceCtx, span := trace.StartSpan(ctx, "testspan", trace.WithSampler(trace.AlwaysSample())) + tc.callF(traceCtx) + span.End() + trace.UnregisterExporter(exporter) + + if unmatched := exporter.hasSpans(tc.wantSpans); len(unmatched) > 0 { + t.Errorf("case (%s): unmatched spans: %s", tc.description, strings.Join(unmatched, ",")) + } + } +}