Skip to content

Commit

Permalink
Merge pull request #125 from dnephin/dnephin/stream-metrics
Browse files Browse the repository at this point in the history
inmem: allow streaming of metrics as intervals complete
  • Loading branch information
banks committed Jun 14, 2021
2 parents a054c40 + 01db687 commit f792dbc
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 21 deletions.
42 changes: 22 additions & 20 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type IntervalMetrics struct {
// Samples maps the key to an AggregateSample,
// which has the rolled up view of a sample
Samples map[string]SampledValue

// done is closed when this interval has ended, and a new IntervalMetrics
// has been created to receive any future metrics.
done chan struct{}
}

// NewIntervalMetrics creates a new IntervalMetrics for a given interval
Expand All @@ -65,6 +69,7 @@ func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
Points: make(map[string][]float32),
Counters: make(map[string]SampledValue),
Samples: make(map[string]SampledValue),
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -270,49 +275,46 @@ func (i *InmemSink) Data() []*IntervalMetrics {
return intervals
}

func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
i.intervalLock.RLock()
defer i.intervalLock.RUnlock()
// getInterval returns the current interval. A new interval is created if no
// previous interval exists, or if the current time is beyond the window for the
// current interval.
func (i *InmemSink) getInterval() *IntervalMetrics {
intv := time.Now().Truncate(i.interval)

// Attempt to return the existing interval first, because it only requires
// a read lock.
i.intervalLock.RLock()
n := len(i.intervals)
if n > 0 && i.intervals[n-1].Interval == intv {
defer i.intervalLock.RUnlock()
return i.intervals[n-1]
}
return nil
}
i.intervalLock.RUnlock()

func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
i.intervalLock.Lock()
defer i.intervalLock.Unlock()

// Check for an existing interval
n := len(i.intervals)
// Re-check for an existing interval now that the lock is re-acquired.
n = len(i.intervals)
if n > 0 && i.intervals[n-1].Interval == intv {
return i.intervals[n-1]
}

// Add the current interval
current := NewIntervalMetrics(intv)
i.intervals = append(i.intervals, current)
n++
if n > 0 {
close(i.intervals[n-1].done)
}

// Truncate the intervals if they are too long
n++
// Prune old intervals if the count exceeds the max.
if n >= i.maxIntervals {
copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
i.intervals = i.intervals[:i.maxIntervals]
}
return current
}

// getInterval returns the current interval to write to
func (i *InmemSink) getInterval() *IntervalMetrics {
intv := time.Now().Truncate(i.interval)
if m := i.getExistingInterval(intv); m != nil {
return m
}
return i.createInterval(intv)
}

// Flattens the key for formatting, removes spaces
func (i *InmemSink) flattenKey(parts []string) string {
buf := &bytes.Buffer{}
Expand Down
33 changes: 32 additions & 1 deletion inmem_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -68,6 +69,10 @@ func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request)
interval = data[n-2]
}

return newMetricSummaryFromInterval(interval), nil
}

func newMetricSummaryFromInterval(interval *IntervalMetrics) MetricsSummary {
interval.RLock()
defer interval.RUnlock()

Expand Down Expand Up @@ -103,7 +108,7 @@ func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request)
summary.Counters = formatSamples(interval.Counters)
summary.Samples = formatSamples(interval.Samples)

return summary, nil
return summary
}

func formatSamples(source map[string]SampledValue) []SampledValue {
Expand All @@ -129,3 +134,29 @@ func formatSamples(source map[string]SampledValue) []SampledValue {

return output
}

type Encoder interface {
Encode(interface{}) error
}

// Stream writes metrics using encoder.Encode each time an interval ends. Runs
// until the request context is cancelled, or the encoder returns an error.
// The caller is responsible for logging any errors from encoder.
func (i *InmemSink) Stream(ctx context.Context, encoder Encoder) {
interval := i.getInterval()

for {
select {
case <-interval.done:
summary := newMetricSummaryFromInterval(interval)
if err := encoder.Encode(summary); err != nil {
return
}

// update interval to the next one
interval = i.getInterval()
case <-ctx.Done():
return
}
}
}
74 changes: 74 additions & 0 deletions inmem_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package metrics

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand Down Expand Up @@ -273,3 +278,72 @@ func TestDisplayMetrics_RaceMetricsSetGauge(t *testing.T) {
verify.Values(t, "all", got, float32(42))
}

func TestInmemSink_Stream(t *testing.T) {
interval := 10 * time.Millisecond
total := 50 * time.Millisecond
inm := NewInmemSink(interval, total)

ctx, cancel := context.WithTimeout(context.Background(), total*2)
defer cancel()

chDone := make(chan struct{})

go func() {
for i := float32(0); ctx.Err() == nil; i++ {
inm.SetGaugeWithLabels([]string{"gauge", "foo"}, 20+i, []Label{{"a", "b"}})
inm.EmitKey([]string{"key", "foo"}, 30+i)
inm.IncrCounterWithLabels([]string{"counter", "bar"}, 40+i, []Label{{"a", "b"}})
inm.IncrCounterWithLabels([]string{"counter", "bar"}, 50+i, []Label{{"a", "b"}})
inm.AddSampleWithLabels([]string{"sample", "bar"}, 60+i, []Label{{"a", "b"}})
inm.AddSampleWithLabels([]string{"sample", "bar"}, 70+i, []Label{{"a", "b"}})
time.Sleep(interval / 3)
}
close(chDone)
}()

resp := httptest.NewRecorder()
enc := encoder{
encoder: json.NewEncoder(resp),
flusher: resp,
}
inm.Stream(ctx, enc)

<-chDone

decoder := json.NewDecoder(resp.Body)
var prevGaugeValue float32
for i := 0; i < 8; i++ {
var summary MetricsSummary
if err := decoder.Decode(&summary); err != nil {
t.Fatalf("expected no error while decoding response %d, got %v", i, err)
}
if count := len(summary.Gauges); count != 1 {
t.Fatalf("expected at least one gauge in response %d, got %v", i, count)
}
value := summary.Gauges[0].Value
// The upper bound of the gauge value is not known, but we can expect it
// to be less than 50 because it increments by 3 every interval and we run
// for ~10 intervals.
if value < 20 || value > 50 {
t.Fatalf("expected interval %d guage value between 20 and 50, got %v", i, value)
}
if value <= prevGaugeValue {
t.Fatalf("expected interval %d guage value to be greater than previous, %v == %v", i, value, prevGaugeValue)
}
prevGaugeValue = value
}
}

type encoder struct {
flusher http.Flusher
encoder *json.Encoder
}

func (e encoder) Encode(metrics interface{}) error {
if err := e.encoder.Encode(metrics); err != nil {
fmt.Println("failed to encode metrics summary", "error", err)
return err
}
e.flusher.Flush()
return nil
}

0 comments on commit f792dbc

Please sign in to comment.