Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Allow creating additional View universes. #1196

Merged
merged 7 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
_ "go.opencensus.io/stats/view" // enable collection
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -52,6 +53,22 @@ func BenchmarkRecord8(b *testing.B) {
}
}

func BenchmarkRecord8_WithRecorder(b *testing.B) {
ctx := context.Background()
meter := view.NewMeter()
meter.Start()
defer meter.Stop()
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Note that this benchmark has one extra allocation for stats.WithRecorder.
// If you cache the recorder option, this benchmark should be equally fast as BenchmarkRecord8
stats.RecordWithOptions(ctx, stats.WithRecorder(meter), stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
}

b.StopTimer()
}

func BenchmarkRecord8_Parallel(b *testing.B) {
ctx := context.Background()
b.ResetTimer()
Expand Down
20 changes: 20 additions & 0 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@ func init() {
}
}

// Recorder provides an interface for exporting measurement information from
// the static Record method by using the WithRecorder option.
type Recorder interface {
// Record records a set of measurements associated with the given tags and attachments.
// The second argument is a `[]Measurement`.
Record(*tag.Map, interface{}, map[string]interface{})
}

type recordOptions struct {
attachments metricdata.Attachments
mutators []tag.Mutator
measurements []Measurement
recorder Recorder
}

// WithAttachments applies provided exemplar attachments.
Expand All @@ -58,6 +67,14 @@ func WithMeasurements(measurements ...Measurement) Options {
}
}

// WithRecorder records the measurements to the specified `Recorder`, rather
// than to the global metrics recorder.
func WithRecorder(meter Recorder) Options {
return func(ro *recordOptions) {
ro.recorder = meter
}
}

// Options apply changes to recordOptions.
type Options func(*recordOptions)

Expand Down Expand Up @@ -93,6 +110,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
return nil
}
recorder := internal.DefaultRecorder
if o.recorder != nil {
recorder = o.recorder.Record
}
if recorder == nil {
return nil
}
Expand Down
106 changes: 106 additions & 0 deletions stats/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) {
if err := view.Register(v); err != nil {
log.Fatalf("Failed to register views: %v", err)
}
defer view.Unregister(v)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12)))
Expand Down Expand Up @@ -93,3 +94,108 @@ func TestRecordWithAttachments(t *testing.T) {
func cmpExemplar(got, want *metricdata.Exemplar) string {
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
}

func TestRecordWithMeter(t *testing.T) {
meter := view.NewMeter()
meter.Start()
defer meter.Stop()
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless)
v := []*view.View{{
Name: "test_view",
TagKeys: []tag.Key{k1, k2},
Measure: m1,
Aggregation: view.Distribution(5, 10),
}, {
Name: "second_view",
TagKeys: []tag.Key{k1},
Measure: m2,
Aggregation: view.Count(),
}}
meter.SetReportingPeriod(100 * time.Millisecond)
if err := meter.Register(v...); err != nil {
t.Fatalf("Failed to register view: %v", err)
}
defer meter.Unregister(v...)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to set context: %v", err)
}
err = stats.RecordWithOptions(ctx,
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
stats.WithAttachments(attachments),
stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)),
stats.WithRecorder(meter))
if err != nil {
t.Fatalf("Failed to resolve data point: %v", err)
}

rows, err := meter.RetrieveData("test_view")
if err != nil {
t.Fatalf("Unable to retrieve data for test_view: %v", err)
}
if len(rows) != 1 {
t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows)
}
if len(rows[0].Tags) != 2 {
t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags)
}
// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}}
for i, tag := range rows[0].Tags {
if tag.Key != wantTags[i].Key {
t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key)
}
if tag.Value != wantTags[i].Value {
t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value)
}

}
wantBuckets := []int64{0, 1, 1}
gotBuckets := rows[0].Data.(*view.DistributionData)
if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) {
t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets)
}
for i, e := range gotBuckets.ExemplarsPerBucket {
if gotBuckets.CountPerBucket[i] == 0 {
if e != nil {
t.Errorf("Unexpected exemplar for bucket")
}
continue
}
// values from the metrics above
exemplarValues := []float64{0, 6, 12}
wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments}
if diff := cmpExemplar(e, wantExemplar); diff != "" {
t.Errorf("Bad exemplar for %d: %+v", i, diff)
}
}

rows2, err := meter.RetrieveData("second_view")
if err != nil {
t.Fatalf("Failed to read second_view: %v", err)
}
if len(rows2) != 1 {
t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2)
}
if len(rows2[0].Tags) != 1 {
t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags)
}
wantTags = []tag.Tag{{Key: k1, Value: "bar"}}
for i, tag := range rows2[0].Tags {
if wantTags[i].Key != tag.Key {
t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key)
}
if wantTags[i].Value != tag.Value {
t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value)
}
}
gotCount := rows2[0].Data.(*view.CountData)
if gotCount.Value != 1 {
t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value)
}
}
56 changes: 40 additions & 16 deletions stats/view/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,15 @@ var (
// BenchmarkRecordReqCommand benchmarks calling the internal recording machinery
// directly.
func BenchmarkRecordReqCommand(b *testing.B) {
w := newWorker()
w := NewMeter().(*worker)

register := &registerViewReq{views: []*View{view}, err: make(chan error, 1)}
register.handleCommand(w)
if err := <-register.err; err != nil {
b.Fatal(err)
}

const tagCount = 10
ctxs := make([]context.Context, 0, tagCount)
for i := 0; i < tagCount; i++ {
ctx, _ := tag.New(context.Background(),
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
)
ctxs = append(ctxs, ctx)
}
ctxs := prepareContexts(10)

b.ReportAllocs()
b.ResetTimer()
Expand All @@ -91,3 +77,41 @@ func BenchmarkRecordReqCommand(b *testing.B) {
record.handleCommand(w)
}
}

func BenchmarkRecordViaStats(b *testing.B) {

meter := NewMeter()
meter.Start()
defer meter.Stop()
meter.Register(view)
defer meter.Unregister(view)

ctxs := prepareContexts(10)
rec := stats.WithRecorder(meter)
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
stats.RecordWithOptions(ctxs[i%len(ctxs)], rec, stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
}

}

func prepareContexts(tagCount int) []context.Context {
ctxs := make([]context.Context, 0, tagCount)
for i := 0; i < tagCount; i++ {
ctx, _ := tag.New(context.Background(),
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
)
ctxs = append(ctxs, ctx)
}

return ctxs
}
17 changes: 2 additions & 15 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

package view

import "sync"

var (
exportersMu sync.RWMutex // guards exporters
exporters = make(map[Exporter]struct{})
)

// Exporter exports the collected records as view data.
//
// The ExportView method should return quickly; if an
Expand All @@ -43,16 +36,10 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

exporters[e] = struct{}{}
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

delete(exporters, e)
defaultWorker.UnregisterExporter(e)
}