Skip to content

Commit

Permalink
Make it easier to manually report accumulated data
Browse files Browse the repository at this point in the history
- Report data for a given view when that view is unregistered;
- Add an explicit command to report all accumulated data.

This probably fixes census-instrumentation#773.
  • Loading branch information
knyar committed Jul 25, 2018
1 parent 9260bbf commit 80f3fa2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 20 deletions.
52 changes: 33 additions & 19 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ func SetReportingPeriod(d time.Duration) {
<-req.c // don't return until the timer is set to the new duration.
}

// ReportUsage immediately reports collected data to all registered exporters
// outside of the regular reporting cycle.
func ReportUsage() {
req := &reportUsageReq{
done: make(chan struct{}),
}
defaultWorker.c <- req
<-req.done
}

func newWorker() *worker {
return &worker{
measures: make(map[string]*measureRef),
Expand Down Expand Up @@ -194,26 +204,30 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
return vi, nil
}

func (w *worker) reportView(v *viewInternal, now time.Time) {
if !v.isSubscribed() {
return
}
rows := v.collectedRows()
_, ok := w.startTimes[v]
if !ok {
w.startTimes[v] = now
}
viewData := &Data{
View: v.view,
Start: w.startTimes[v],
End: time.Now(),
Rows: rows,
}
exportersMu.Lock()
for e := range exporters {
e.ExportView(viewData)
}
exportersMu.Unlock()
}

func (w *worker) reportUsage(now time.Time) {
for _, v := range w.views {
if !v.isSubscribed() {
continue
}
rows := v.collectedRows()
_, ok := w.startTimes[v]
if !ok {
w.startTimes[v] = now
}
viewData := &Data{
View: v.view,
Start: w.startTimes[v],
End: time.Now(),
Rows: rows,
}
exportersMu.Lock()
for e := range exporters {
e.ExportView(viewData)
}
exportersMu.Unlock()
w.reportView(v, now)
}
}
14 changes: 14 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
continue
}

// Report pending data for this view before removing it.
w.reportView(vi, time.Now())

vi.unsubscribe()
if !vi.isSubscribed() {
// this was the last subscription and view is not collecting anymore.
Expand Down Expand Up @@ -169,3 +172,14 @@ func (cmd *setReportingPeriodReq) handleCommand(w *worker) {
}
cmd.c <- true
}

// reportUsageReq is a command to immediately report collected data
// outside of the regular reporting cycle.
type reportUsageReq struct {
done chan struct{}
}

func (cmd *reportUsageReq) handleCommand(w *worker) {
w.reportUsage(time.Now())
cmd.done <- struct{}{}
}
74 changes: 73 additions & 1 deletion stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,80 @@ func TestWorkerStarttime(t *testing.T) {
e.Unlock()
}

func TestReportUsageCommand(t *testing.T) {
restart()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

SetReportingPeriod(5 * time.Second)

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))

ReportUsage()

// ReportUsage should flush all views, so expect the count of 3.
want := int64(3)

e.Lock()
got := e.totalCount
e.Unlock()
if got != want {
t.Errorf("got count data = %v; want %v", got, want)
}
}

func TestUnregisterReportsUsage(t *testing.T) {
restart()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

SetReportingPeriod(5 * time.Second)

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))

Unregister(view2)

// Unregister should only flush view2, so expect the count of 2.
want := int64(2)

e.Lock()
got := e.totalCount
e.Unlock()
if got != want {
t.Errorf("got count data = %v; want %v", got, want)
}
}

type countExporter struct {
sync.Mutex
count int64
count int64
totalCount int64
}

func (e *countExporter) ExportView(vd *Data) {
Expand All @@ -376,6 +447,7 @@ func (e *countExporter) ExportView(vd *Data) {
e.Lock()
defer e.Unlock()
e.count = d.Value
e.totalCount += d.Value
}

type vdExporter struct {
Expand Down

0 comments on commit 80f3fa2

Please sign in to comment.