Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Change record interface to include WithMeter option, per @rghetia
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Anderson committed Feb 12, 2020
1 parent c96d579 commit c4c5f2d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 55 deletions.
42 changes: 16 additions & 26 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,17 @@ func init() {
}
}

// ResolvedOptions can be used to extract the current tags and measurements
// from context and stats arguments when using custom workers to export stats
// to a separate exporter.
type ResolvedOptions struct {
Attachments metricdata.Attachments
Tags *tag.Map
Measures []Measurement
// Recorder is a subset of the view.Meter interface which only includes
// the Record method, to avoid circular imports between stats and view.
type Recorder interface {
Record(*tag.Map, interface{}, map[string]interface{})
}

type recordOptions struct {
attachments metricdata.Attachments
mutators []tag.Mutator
measurements []Measurement
recorder Recorder
}

// WithAttachments applies provided exemplar attachments.
Expand All @@ -67,6 +65,14 @@ func WithMeasurements(measurements ...Measurement) Options {
}
}

// WithMeter records the measurements to the specified `view.Meter`, rather
// than to the global metrics recorder.
func WithMeter(meter Recorder) Options {
return func(ro *recordOptions) {
ro.recorder = meter
}
}

// Options apply changes to recordOptions.
type Options func(*recordOptions)

Expand All @@ -93,23 +99,6 @@ func RecordWithTags(ctx context.Context, mutators []tag.Mutator, ms ...Measureme
return RecordWithOptions(ctx, WithTags(mutators...), WithMeasurements(ms...))
}

// ResolveOptions determines the full set of Tags, Measurements, etc from the
// provided Options and context.Context. This is mostly useful when using
// multiple exporters.
func ResolveOptions(ctx context.Context, ros ...Options) (*ResolvedOptions, error) {
o := createRecordOption(ros...)

if len(o.mutators) > 0 {
var err error
if ctx, err = tag.New(ctx, o.mutators...); err != nil {
return nil, err
}
}
return &ResolvedOptions{Tags: tag.FromContext(ctx),
Measures: o.measurements,
Attachments: o.attachments}, nil
}

// RecordWithOptions records measurements from the given options (if any) against context
// and tags and attachments in the options (if any).
// If there are any tags in the context, measurements will be tagged with them.
Expand All @@ -118,9 +107,10 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
if len(o.measurements) == 0 {
return nil
}
// This could use ResolveOptions, but it does additional work to
// short-circuit if there are no metrics that need to be exported.
recorder := internal.DefaultRecorder
if o.recorder != nil {
recorder = o.recorder.Record
}
if recorder == nil {
return nil
}
Expand Down
84 changes: 63 additions & 21 deletions stats/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func cmpExemplar(got, want *metricdata.Exemplar) string {
}

func TestResolveOptions(t *testing.T) {
meter := view.NewWorker()
meter.Start()
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
Expand All @@ -111,48 +113,88 @@ func TestResolveOptions(t *testing.T) {
Measure: m2,
Aggregation: view.Count(),
}}
view.SetReportingPeriod(100 * time.Millisecond)
if err := view.Register(v...); err != nil {
meter.SetReportingPeriod(100 * time.Millisecond)
if err := meter.Register(v...); err != nil {
t.Fatalf("Failed to register view: %v", err)
}
defer view.Unregister(v...)
defer meter.Unregister(v...)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to set context: %v", err)
}
ro, err := stats.ResolveOptions(ctx,
err = stats.RecordWithOptions(ctx,
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
stats.WithAttachments(attachments),
stats.WithMeasurements(m1.M(12), m2.M(5)))
stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)),
stats.WithMeter(meter))
if err != nil {
t.Fatalf("Failed to resolve data point: %v", err)
}

s, ok := ro.Attachments[metricdata.AttachmentKeySpanContext]
if !ok || s != spanCtx {
t.Errorf("Unexpected SpanContext: want %v, got %v", spanCtx, s)
rows, err := meter.RetrieveData("test_view")
if err != nil {
t.Fatalf("Unable to retrieve data for test_view: %v", err)
}
if len(ro.Attachments) != 1 {
t.Errorf("Expected only one attachment (SpanContext), got %v", ro.Attachments)
if len(rows) != 1 {
t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows)
}
if len(rows[0].Tags) != 2 {
t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags)
}
// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}}
for i, tag := range rows[0].Tags {
if tag.Key != wantTags[i].Key {
t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key)
}
if tag.Value != wantTags[i].Value {
t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value)
}

if len(ro.Measures) != 2 {
t.Errorf("Expected two measurements, got %v", ro.Measures)
}
mWant := []stats.Measurement{m1.M(12), m2.M(5)}
if ro.Measures[0] != mWant[0] || ro.Measures[1] != mWant[1] {
t.Errorf("Unexpected measurements: want %v, got %v", mWant, ro.Measures)
wantBuckets := []int64{0, 1, 1}
gotBuckets := rows[0].Data.(*view.DistributionData)
if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) {
t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets)
}
for i, e := range gotBuckets.ExemplarsPerBucket {
if gotBuckets.CountPerBucket[i] == 0 {
if e != nil {
t.Errorf("Unexpected exemplar for bucket")
}
continue
}
// values from the metrics above
exemplarValues := []float64{0, 6, 12}
wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments}
if diff := cmpExemplar(e, wantExemplar); diff != "" {
t.Errorf("Bad exemplar for %d: %+v", i, diff)
}
}

// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
tCtx, err := tag.New(context.Background(), tag.Insert(k1, "bar"), tag.Insert(k2, "foo"))
rows2, err := meter.RetrieveData("second_view")
if err != nil {
t.Fatalf("Failed to construct tWant: %v", err)
t.Fatalf("Failed to read second_view: %v", err)
}
if len(rows2) != 1 {
t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2)
}
if len(rows2[0].Tags) != 1 {
t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags)
}
wantTags = []tag.Tag{{Key: k1, Value: "bar"}}
for i, tag := range rows2[0].Tags {
if wantTags[i].Key != tag.Key {
t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key)
}
if wantTags[i].Value != tag.Value {
t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value)
}
}
tWant := tag.FromContext(tCtx)
if ro.Tags.String() != tWant.String() {
t.Errorf("Unexpected tags: want %v, got %v", tWant, ro.Tags)
gotCount := rows2[0].Data.(*view.CountData)
if gotCount.Value != 1 {
t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value)
}
}
18 changes: 12 additions & 6 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type worker struct {
// module should cover the common use cases.
type Meter interface {
// Record records a set of measurements ms associated with the given tags and attachments.
Record(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{})
Record(tags *tag.Map, ms interface{}, attachments map[string]interface{})
// Find returns a registered view associated with this name.
// If no registered view is found, nil is returned.
Find(name string) *View
Expand Down Expand Up @@ -98,6 +98,10 @@ type Meter interface {
Start()
// Stop causes the Meter to stop processing calls and terminate data export.
Stop()

// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
RetrieveData(viewName string) ([]*Row, error)
}

var _ Meter = (*worker)(nil)
Expand Down Expand Up @@ -169,10 +173,12 @@ func (w *worker) Unregister(views ...*View) {
// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func RetrieveData(viewName string) ([]*Row, error) {
return defaultWorker.retrieveData(viewName)
return defaultWorker.RetrieveData(viewName)
}

func (w *worker) retrieveData(viewName string) ([]*Row, error) {
// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
req := &retrieveDataReq{
now: time.Now(),
v: viewName,
Expand All @@ -184,14 +190,14 @@ func (w *worker) retrieveData(viewName string) ([]*Row, error) {
}

func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
defaultWorker.Record(tags, ms.([]stats.Measurement), attachments)
defaultWorker.Record(tags, ms, attachments)
}

// Record records a set of measurements ms associated with the given tags and attachments.
func (w *worker) Record(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
req := &recordReq{
tm: tags,
ms: ms,
ms: ms.([]stats.Measurement),
attachments: attachments,
t: time.Now(),
}
Expand Down
4 changes: 2 additions & 2 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func Test_Worker_MultiExport(t *testing.T) {
for _, wantRow := range wantRows {
retrieve := RetrieveData
if wantRow.w != nil {
retrieve = wantRow.w.(*worker).retrieveData
retrieve = wantRow.w.(*worker).RetrieveData
}
gotRows, err := retrieve(wantRow.view)
if err != nil {
Expand All @@ -207,7 +207,7 @@ func Test_Worker_MultiExport(t *testing.T) {
}
}
// Verify that worker has not been computing sum:
got, err := worker2.retrieveData(sum.Name)
got, err := worker2.RetrieveData(sum.Name)
if err == nil {
t.Errorf("%s: expected no data because it was not registered, got %#v", sum.Name, got)
}
Expand Down

0 comments on commit c4c5f2d

Please sign in to comment.