Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Allow custom view.Meters to export metrics for other Resources (#1212)
Browse files Browse the repository at this point in the history
* Remove call to time.Now() on worker thread when handling record reqs (#1210)

Time is already recorded on the client side and stored in the currently unused recordReq.t
field. Avoiding these repeated calls to time.Now while the worker is blocked can significantly
reduce worker contention.

* Update Meter to track and report Resource for metric data.

Co-authored-by: Ian Milligan <ianmllgn@gmail.com>
  • Loading branch information
evankanderson and ian-mi committed Jun 8, 2020
1 parent 785d899 commit 1901b56
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 37 deletions.
5 changes: 4 additions & 1 deletion stats/view/view_to_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package view
import (
"time"

"go.opencensus.io/resource"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
)
Expand Down Expand Up @@ -125,7 +127,7 @@ func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Ti
}
}

func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric {
func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric {
if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
startTime = time.Time{}
Expand All @@ -144,6 +146,7 @@ func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricda
m := &metricdata.Metric{
Descriptor: *v.metricDescriptor,
TimeSeries: ts,
Resource: r,
}
return m
}
4 changes: 2 additions & 2 deletions stats/view/view_to_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func Test_ViewToMetric(t *testing.T) {
tc.vi.addSample(tag.FromContext(ctx), v, nil, now)
}

gotMetric := viewToMetric(tc.vi, now, startTime)
gotMetric := viewToMetric(tc.vi, nil, now, startTime)
if !cmp.Equal(gotMetric, tc.wantMetric) {
// JSON format is strictly for checking the content when test fails. Do not use JSON
// format to determine if the two values are same as it doesn't differentiate between
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestUnitConversionForAggCount(t *testing.T) {

for _, tc := range tests {
tc.vi.addSample(tag.FromContext(context.Background()), 5.0, nil, now)
gotMetric := viewToMetric(tc.vi, now, startTime)
gotMetric := viewToMetric(tc.vi, nil, now, startTime)
gotUnit := gotMetric.Descriptor.Unit
if !cmp.Equal(gotUnit, tc.wantUnit) {
t.Errorf("Verify Unit: %s: Got:%v Want:%v", tc.name, gotUnit, tc.wantUnit)
Expand Down
17 changes: 16 additions & 1 deletion stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"
"time"

"go.opencensus.io/resource"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -47,6 +49,7 @@ type worker struct {
c chan command
quit, done chan bool
mu sync.RWMutex
r *resource.Resource

exportersMu sync.RWMutex
exporters map[Exporter]struct{}
Expand Down Expand Up @@ -91,6 +94,10 @@ type Meter interface {
RegisterExporter(Exporter)
// UnregisterExporter unregisters an exporter.
UnregisterExporter(Exporter)
// SetResource may be used to set the Resource associated with this registry.
// This is intended to be used in cases where a single process exports metrics
// for multiple Resources, typically in a multi-tenant situation.
SetResource(*resource.Resource)

// Start causes the Meter to start processing Record calls and aggregating
// statistics as well as exporting data.
Expand Down Expand Up @@ -249,6 +256,14 @@ func NewMeter() Meter {
}
}

// SetResource associates all data collected by this Meter with the specified
// resource. This resource is reported when using metricexport.ReadAndExport;
// it is not provided when used with ExportView/RegisterExporter, because that
// interface does not provide a means for reporting the Resource.
func (w *worker) SetResource(r *resource.Resource) {
w.r = r
}

func (w *worker) Start() {
go w.start()
}
Expand Down Expand Up @@ -371,7 +386,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
startTime = w.startTimes[v]
}

return viewToMetric(v, now, startTime)
return viewToMetric(v, w.r, now, startTime)
}

// Read reads all view data and returns them as metrics.
Expand Down
105 changes: 72 additions & 33 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package view
import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"

"go.opencensus.io/resource"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -123,8 +126,13 @@ func Test_Worker_MultiExport(t *testing.T) {

// This test reports the same data for the default worker and a secondary
// worker, and ensures that the stats are kept independently.
extraResource := resource.Resource{
Type: "additional",
Labels: map[string]string{"key1": "value1", "key2": "value2"},
}
worker2 := NewMeter().(*worker)
worker2.Start()
worker2.SetResource(&extraResource)

m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit")
key := tag.MustNewKey(("key"))
Expand Down Expand Up @@ -162,50 +170,62 @@ func Test_Worker_MultiExport(t *testing.T) {
}
}

wantRows := []struct {
w Meter
view string
rows []*Row
}{{
view: count.Name,
rows: []*Row{
makeKey := func(r *resource.Resource, view string) string {
if r == nil {
r = &resource.Resource{}
}
return resource.EncodeLabels(r.Labels) + "/" + view
}

// Format is Resource.Labels encoded as string, then
wantPartialData := map[string][]*Row{
makeKey(nil, count.Name): []*Row{
{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
}, {
view: sum.Name,
rows: []*Row{
{nil, &SumData{Value: 7.5}}},
}, {
w: worker2,
view: count.Name,
rows: []*Row{
makeKey(nil, sum.Name): []*Row{
{nil, &SumData{Value: 7.5}},
},
makeKey(&extraResource, count.Name): []*Row{
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
}}
}

for _, wantRow := range wantRows {
retrieve := RetrieveData
if wantRow.w != nil {
retrieve = wantRow.w.(*worker).RetrieveData
}
gotRows, err := retrieve(wantRow.view)
if err != nil {
t.Fatalf("RetrieveData(%v), got error %v", wantRow.view, err)
te := &testExporter{}
metricexport.NewReader().ReadAndExport(te)
for _, m := range te.metrics {
key := makeKey(m.Resource, m.Descriptor.Name)
want, ok := wantPartialData[key]
if !ok {
t.Errorf("Unexpected data for %q: %v", key, m)
continue
}
for _, got := range gotRows {
if !containsRow(wantRow.rows, got) {
t.Errorf("%s: got row %#v; want none", wantRow.view, got)
break
gotTs := m.TimeSeries
sort.Sort(byLabel(gotTs))

for i, ts := range gotTs {
for j, label := range ts.LabelValues {
if want[i].Tags[j].Value != label.Value {
t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key)
}
}
}
for _, want := range wantRow.rows {
if !containsRow(gotRows, want) {
t.Errorf("%s: got none, want %#v", wantRow.view, want)
break
switch wantValue := want[i].Data.(type) {
case *CountData:
got := ts.Points[0].Value.(int64)
if wantValue.Value != got {
t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue, got, ts, key)
}
case *SumData:
got := ts.Points[0].Value.(float64)
if wantValue.Value != got {
t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue, got, ts, key)
}
default:
t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key)
}
}
}

// Verify that worker has not been computing sum:
got, err := worker2.RetrieveData(sum.Name)
if err == nil {
Expand Down Expand Up @@ -577,9 +597,11 @@ func TestWorkerRace(t *testing.T) {
}

type testExporter struct {
metrics []*metricdata.Metric
}

func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
te.metrics = metrics
return nil
}

Expand Down Expand Up @@ -619,3 +641,20 @@ func restart() {
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
}

// byTag implements sort.Interface for *metricdata.TimeSeries by Labels.
type byLabel []*metricdata.TimeSeries

func (ts byLabel) Len() int { return len(ts) }
func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
func (ts byLabel) Less(i, j int) bool {
if len(ts[i].LabelValues) != len(ts[j].LabelValues) {
return len(ts[i].LabelValues) < len(ts[j].LabelValues)
}
for k := range ts[i].LabelValues {
if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value {
return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value
}
}
return false
}

0 comments on commit 1901b56

Please sign in to comment.