Skip to content

Commit

Permalink
Report data for a given view when it is unregistered.
Browse files Browse the repository at this point in the history
This might help with census-instrumentation#773.
  • Loading branch information
knyar committed Jul 26, 2018
1 parent 9260bbf commit f60f5fd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 20 deletions.
42 changes: 23 additions & 19 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,26 +194,30 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
return vi, nil
}

func (w *worker) reportView(v *viewInternal, now time.Time) {
if !v.isSubscribed() {
return
}
rows := v.collectedRows()
_, ok := w.startTimes[v]
if !ok {
w.startTimes[v] = now
}
viewData := &Data{
View: v.view,
Start: w.startTimes[v],
End: time.Now(),
Rows: rows,
}
exportersMu.Lock()
for e := range exporters {
e.ExportView(viewData)
}
exportersMu.Unlock()
}

func (w *worker) reportUsage(now time.Time) {
for _, v := range w.views {
if !v.isSubscribed() {
continue
}
rows := v.collectedRows()
_, ok := w.startTimes[v]
if !ok {
w.startTimes[v] = now
}
viewData := &Data{
View: v.view,
Start: w.startTimes[v],
End: time.Now(),
Rows: rows,
}
exportersMu.Lock()
for e := range exporters {
e.ExportView(viewData)
}
exportersMu.Unlock()
w.reportView(v, now)
}
}
3 changes: 3 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
continue
}

// Report pending data for this view before removing it.
w.reportView(vi, time.Now())

vi.unsubscribe()
if !vi.isSubscribed() {
// this was the last subscription and view is not collecting anymore.
Expand Down
39 changes: 38 additions & 1 deletion stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,45 @@ func TestWorkerStarttime(t *testing.T) {
e.Unlock()
}

func TestUnregisterReportsUsage(t *testing.T) {
restart()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

SetReportingPeriod(time.Hour)

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))

Unregister(view2)

// Unregister should only flush view2, so expect the count of 2.
want := int64(2)

e.Lock()
got := e.totalCount
e.Unlock()
if got != want {
t.Errorf("got count data = %v; want %v", got, want)
}
}

type countExporter struct {
sync.Mutex
count int64
count int64
totalCount int64
}

func (e *countExporter) ExportView(vd *Data) {
Expand All @@ -376,6 +412,7 @@ func (e *countExporter) ExportView(vd *Data) {
e.Lock()
defer e.Unlock()
e.count = d.Value
e.totalCount += d.Value
}

type vdExporter struct {
Expand Down

0 comments on commit f60f5fd

Please sign in to comment.