Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add trace instrumentation support for individual rpcs #6493

Merged
merged 12 commits into from Sep 2, 2022
19 changes: 18 additions & 1 deletion bigquery/dataset.go
Expand Up @@ -216,7 +216,12 @@ func (d *Dataset) deleteInternal(ctx context.Context, deleteContents bool) (err

call := d.c.bqs.Datasets.Delete(d.ProjectID, d.DatasetID).Context(ctx).DeleteContents(deleteContents)
setClientHeader(call.Header())
return call.Do()
return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}

// Metadata fetches the metadata for the dataset.
Expand All @@ -228,7 +233,9 @@ func (d *Dataset) Metadata(ctx context.Context) (md *DatasetMetadata, err error)
setClientHeader(call.Header())
var ds *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.get")
ds, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -284,7 +291,9 @@ func (d *Dataset) Update(ctx context.Context, dm DatasetMetadataToUpdate, etag s
}
var ds2 *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.patch")
ds2, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -391,7 +400,9 @@ var listTables = func(it *TableIterator, pageSize int, pageToken string) (*bq.Ta
}
var res *bq.TableList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.tables.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -476,7 +487,9 @@ var listModels = func(it *ModelIterator, pageSize int, pageToken string) (*bq.Li
}
var res *bq.ListModelsResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.models.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -563,7 +576,9 @@ var listRoutines = func(it *RoutineIterator, pageSize int, pageToken string) (*b
}
var res *bq.ListRoutinesResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.routines.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -667,7 +682,9 @@ var listDatasets = func(it *DatasetIterator, pageSize int, pageToken string) (*b
}
var res *bq.DatasetList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.datasets.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down
2 changes: 2 additions & 0 deletions bigquery/inserter.go
Expand Up @@ -182,7 +182,9 @@ func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
setClientHeader(call.Header())
var res *bq.TableDataInsertAllResponse
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion bigquery/job.go
Expand Up @@ -240,7 +240,9 @@ func (j *Job) Cancel(ctx context.Context) error {
Context(ctx)
setClientHeader(call.Header())
return runWithRetry(ctx, func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
_, err := call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand All @@ -257,7 +259,9 @@ func (j *Job) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand Down Expand Up @@ -343,7 +347,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
}
var res *bq.GetQueryResultsResponse
err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
res, err = call.Do()
trace.EndSpan(sCtx, err)
if err != nil {
return !retryableError(err, jobRetryReasons), err
}
Expand Down Expand Up @@ -837,7 +843,14 @@ func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
if it.ParentJobID != "" {
req.ParentJobId(it.ParentJobID)
}
res, err := req.Do()
var res *bq.JobList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
res, err = req.Do()
trace.EndSpan(sCtx, err)
return err
})

if err != nil {
return "", err
}
Expand Down Expand Up @@ -870,7 +883,9 @@ func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID
}
setClientHeader(call.Header())
err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
job, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions bigquery/model.go
Expand Up @@ -86,7 +86,9 @@ func (m *Model) Metadata(ctx context.Context) (mm *ModelMetadata, err error) {
setClientHeader(req.Header())
var model *bq.Model
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.get")
model, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand All @@ -111,7 +113,9 @@ func (m *Model) Update(ctx context.Context, mm ModelMetadataToUpdate, etag strin
}
var res *bq.Model
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.patch")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions bigquery/routine.go
Expand Up @@ -97,7 +97,9 @@ func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error)
setClientHeader(req.Header())
var routine *bq.Routine
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.get")
routine, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down Expand Up @@ -129,7 +131,9 @@ func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag
}
var res *bq.Routine
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.update")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions bigquery/table.go
Expand Up @@ -581,7 +581,9 @@ func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) {
req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
setClientHeader(req.Header())
return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.insert")
_, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -716,7 +718,9 @@ func (t *Table) Metadata(ctx context.Context, opts ...TableMetadataOption) (md *
setClientHeader(tgc.call.Header())
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.get")
res, err = tgc.call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -783,7 +787,9 @@ func (t *Table) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.delete")
err = call.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -841,7 +847,9 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
}
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.patch")
res, err = tpc.call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down