Skip to content

Commit

Permalink
inmem: add support for streaming metrics as intervals complete
Browse files Browse the repository at this point in the history
This will allow `consul debug` to include all the metrics, instead of
missing many metrics.
  • Loading branch information
dnephin committed Jun 10, 2021
1 parent 96c40ac commit b1da2f8
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
6 changes: 5 additions & 1 deletion inmem.go
Expand Up @@ -69,6 +69,7 @@ func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
Points: make(map[string][]float32),
Counters: make(map[string]SampledValue),
Samples: make(map[string]SampledValue),
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -301,8 +302,11 @@ func (i *InmemSink) getInterval() *IntervalMetrics {

current := NewIntervalMetrics(intv)
i.intervals = append(i.intervals, current)
n++
if n > 0 {
close(i.intervals[n-1].done)
}

n++
// Prune old intervals if the count exceeds the max.
if n >= i.maxIntervals {
copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
Expand Down
49 changes: 48 additions & 1 deletion inmem_endpoint.go
@@ -1,6 +1,7 @@
package metrics

import (
"encoding/json"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -68,6 +69,10 @@ func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request)
interval = data[n-2]
}

return newMetricSummaryFromInterval(interval), nil
}

func newMetricSummaryFromInterval(interval *IntervalMetrics) MetricsSummary {
interval.RLock()
defer interval.RUnlock()

Expand Down Expand Up @@ -103,7 +108,7 @@ func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request)
summary.Counters = formatSamples(interval.Counters)
summary.Samples = formatSamples(interval.Samples)

return summary, nil
return summary
}

func formatSamples(source map[string]SampledValue) []SampledValue {
Expand All @@ -129,3 +134,45 @@ func formatSamples(source map[string]SampledValue) []SampledValue {

return output
}

// Stream writes metrics to resp each time an interval ends. Runs until the
// request context is cancelled.
func (i *InmemSink) Stream(resp http.ResponseWriter, req *http.Request) {
interval := i.getInterval()
ctx := req.Context()

resp.WriteHeader(http.StatusOK)
flusher, ok := resp.(http.Flusher)
if ok {
// call Write with 0 bytes before a flush, so that GzipResponseWriter
// can write response headers.
resp.Write([]byte(""))
flusher.Flush()
} else {
flusher = noopFlusher{}
}

encoder := json.NewEncoder(resp)

for {
select {
case <-interval.done:
summary := newMetricSummaryFromInterval(interval)

if err := encoder.Encode(summary); err != nil {
// TODO: pass in a logger to log this error
return
}
flusher.Flush()

// update interval to the next one
interval = i.getInterval()
case <-ctx.Done():
return
}
}
}

type noopFlusher struct{}

func (noopFlusher) Flush() {}
54 changes: 54 additions & 0 deletions inmem_endpoint_test.go
@@ -1,6 +1,10 @@
package metrics

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand Down Expand Up @@ -273,3 +277,53 @@ func TestDisplayMetrics_RaceMetricsSetGauge(t *testing.T) {
verify.Values(t, "all", got, float32(42))
}

func TestInmemSink_Stream(t *testing.T) {
interval := 10 * time.Millisecond
total := 50 * time.Millisecond
inm := NewInmemSink(interval, total)

ctx, cancel := context.WithTimeout(context.Background(), total*2)
defer cancel()

chDone := make(chan struct{})

go func() {
for i := float32(0); ctx.Err() == nil; i++ {
inm.SetGaugeWithLabels([]string{"gauge", "foo"}, 20+i, []Label{{"a", "b"}})
inm.EmitKey([]string{"key", "foo"}, 30+i)
inm.IncrCounterWithLabels([]string{"counter", "bar"}, 40+i, []Label{{"a", "b"}})
inm.IncrCounterWithLabels([]string{"counter", "bar"}, 50+i, []Label{{"a", "b"}})
inm.AddSampleWithLabels([]string{"sample", "bar"}, 60+i, []Label{{"a", "b"}})
inm.AddSampleWithLabels([]string{"sample", "bar"}, 70+i, []Label{{"a", "b"}})
time.Sleep(interval / 3)
}
close(chDone)
}()

resp := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/", nil)
req = req.WithContext(ctx)
inm.Stream(resp, req)

<-chDone

decoder := json.NewDecoder(resp.Body)
var prevGaugeValue float32
for i := 0; i < 8; i++ {
var summary MetricsSummary
if err := decoder.Decode(&summary); err != nil {
t.Fatalf("expected no error while decoding response %d, got %v", i, err)
}
if count := len(summary.Gauges); count != 1 {
t.Fatalf("expected at least one gauge in response %d, got %v", i, count)
}
value := summary.Gauges[0].Value
if value < 20 || value > 50 {
t.Fatalf("expected interval %d guage value between 20 and 50, got %v", i, value)
}
if value <= prevGaugeValue {
t.Fatalf("expected interval %d guage value to be greater than previous, %v == %v", i, value, prevGaugeValue)
}
prevGaugeValue = value
}
}

0 comments on commit b1da2f8

Please sign in to comment.