Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use batch values API for CloudWatch PutMetric data call #960

Merged
merged 3 commits into from Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 26 additions & 4 deletions metrics/cloudwatch/cloudwatch.go
Expand Up @@ -20,6 +20,7 @@ import (

const (
maxConcurrentRequests = 20
maxValuesInABatch = 150
)

type Percentiles []struct {
Expand Down Expand Up @@ -174,13 +175,34 @@ func (cw *CloudWatch) Send() error {
})

cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := last(values)
datums = append(datums, &cloudwatch.MetricDatum{
datum := &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}

if l := len(values); l > 1 {
// CloudWatch Put Metrics API (https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html)
// expects batch of unique values including the array of corresponding counts
valuesCounter := make(map[float64]int)
for _, v := range values {
valuesCounter[v]++
}

for value, count := range valuesCounter {
if len(datum.Values) == maxValuesInABatch {
break
}
datum.Values = append(datum.Values, aws.Float64(value))
datum.Counts = append(datum.Counts, aws.Float64(float64(count)))
}
} else if l == 1 {
datum.Value = aws.Float64(values[0])
taraspos marked this conversation as resolved.
Show resolved Hide resolved
} else {
return true
}

datums = append(datums, datum)
return true
})

Expand Down
72 changes: 55 additions & 17 deletions metrics/cloudwatch/cloudwatch_test.go
Expand Up @@ -18,13 +18,13 @@ import (
type mockCloudWatch struct {
cloudwatchiface.CloudWatchAPI
mtx sync.RWMutex
valuesReceived map[string]float64
valuesReceived map[string][]float64
dimensionsReceived map[string][]*cloudwatch.Dimension
}

func newMockCloudWatch() *mockCloudWatch {
return &mockCloudWatch{
valuesReceived: map[string]float64{},
valuesReceived: map[string][]float64{},
dimensionsReceived: map[string][]*cloudwatch.Dimension{},
}
}
Expand All @@ -33,7 +33,13 @@ func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) (
mcw.mtx.Lock()
defer mcw.mtx.Unlock()
for _, datum := range input.MetricData {
mcw.valuesReceived[*datum.MetricName] = *datum.Value
if len(datum.Values) > 0 {
for _, v := range datum.Values {
mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *v)
}
} else {
mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *datum.Value)
}
mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions
}
return nil, nil
Expand Down Expand Up @@ -76,13 +82,15 @@ func TestCounter(t *testing.T) {
cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
counter := cw.NewCounter(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
if err := cw.Send(); err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
value := svc.valuesReceived[name][len(svc.valuesReceived[name])-1]
delete(svc.valuesReceived, name)

return value
}
if err := teststat.TestCounter(counter, valuef); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -123,7 +131,13 @@ func TestCounterLowSendConcurrency(t *testing.T) {
}

for i, name := range names {
if svc.valuesReceived[name] != wants[i] {
if l := len(svc.valuesReceived[name]); l == 0 && wants[i] == 0 {
continue
} else if l != 1 {
t.Fatalf("one value expected, got %d", l)
}

if svc.valuesReceived[name][0] != wants[i] {
t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name])
}
if err := svc.testDimensions(name, labels[i], values[i]); err != nil {
Expand All @@ -138,15 +152,17 @@ func TestGauge(t *testing.T) {
svc := newMockCloudWatch()
cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
gauge := cw.NewGauge(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
valuef := func() []float64 {
if err := cw.Send(); err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
res := svc.valuesReceived[name]
delete(svc.valuesReceived, name)
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
return res
}

if err := teststat.TestGauge(gauge, valuef); err != nil {
t.Fatal(err)
}
Expand All @@ -170,12 +186,28 @@ func TestHistogram(t *testing.T) {
if err != nil {
t.Fatal(err)
}

svc.mtx.RLock()
defer svc.mtx.RUnlock()
p50 = svc.valuesReceived[n50]
p90 = svc.valuesReceived[n90]
p95 = svc.valuesReceived[n95]
p99 = svc.valuesReceived[n99]
if len(svc.valuesReceived[n50]) > 0 {
p50 = svc.valuesReceived[n50][0]
delete(svc.valuesReceived, n50)
}

if len(svc.valuesReceived[n90]) > 0 {
p90 = svc.valuesReceived[n90][0]
delete(svc.valuesReceived, n90)
}

if len(svc.valuesReceived[n95]) > 0 {
p95 = svc.valuesReceived[n95][0]
delete(svc.valuesReceived, n95)
}

if len(svc.valuesReceived[n99]) > 0 {
p99 = svc.valuesReceived[n99][0]
delete(svc.valuesReceived, n99)
}
return
}
if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
Expand Down Expand Up @@ -207,8 +239,14 @@ func TestHistogram(t *testing.T) {
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
p50 = svc.valuesReceived[n50]
p90 = svc.valuesReceived[n90]
if len(svc.valuesReceived[n50]) > 0 {
p50 = svc.valuesReceived[n50][0]
delete(svc.valuesReceived, n50)
}
if len(svc.valuesReceived[n90]) > 0 {
p90 = svc.valuesReceived[n90][0]
delete(svc.valuesReceived, n90)
}

// our teststat.TestHistogram wants us to give p95 and p99,
// but with custom percentiles we don't have those.
Expand Down
2 changes: 1 addition & 1 deletion metrics/expvar/expvar_test.go
Expand Up @@ -17,7 +17,7 @@ func TestCounter(t *testing.T) {

func TestGauge(t *testing.T) {
gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge)
value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f }
value := func() []float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return []float64{f} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion metrics/generic/generic_test.go
Expand Up @@ -45,7 +45,7 @@ func TestGauge(t *testing.T) {
if want, have := name, gauge.Name; want != have {
t.Errorf("Name: want %q, have %q", want, have)
}
value := gauge.Value
value := func() []float64 { return []float64{gauge.Value()} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions metrics/influx/influx_test.go
Expand Up @@ -34,12 +34,12 @@ func TestGauge(t *testing.T) {
in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`)
gauge := in.NewGauge("influx_gauge")
value := func() float64 {
value := func() []float64 {
client := &bufWriter{}
in.WriteTo(client)
match := re.FindStringSubmatch(client.buf.String())
f, _ := strconv.ParseFloat(match[1], 64)
return f
return []float64{f}
}
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion metrics/pcp/pcp_test.go
Expand Up @@ -40,7 +40,7 @@ func TestGauge(t *testing.T) {

gauge = gauge.With("label values", "not supported").(*Gauge)

value := func() float64 { f := gauge.g.Val(); return f }
value := func() []float64 { f := gauge.g.Val(); return []float64{f} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions metrics/prometheus/prometheus_test.go
Expand Up @@ -68,10 +68,10 @@ func TestGauge(t *testing.T) {
Help: "This is a different help string.",
}, []string{"foo"}).With("foo", "bar")

value := func() float64 {
value := func() []float64 {
matches := re.FindStringSubmatch(scrape())
f, _ := strconv.ParseFloat(matches[1], 64)
return f
return []float64{f}
}

if err := teststat.TestGauge(gauge, value); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions metrics/teststat/buffers.go
Expand Up @@ -23,10 +23,10 @@ func SumLines(w io.WriterTo, regex string) func() float64 {
// LastLine expects a regex whose first capture group can be parsed as a
// float64. It will dump the WriterTo and parse each line, expecting to find a
// match. It returns the final captured float.
func LastLine(w io.WriterTo, regex string) func() float64 {
return func() float64 {
func LastLine(w io.WriterTo, regex string) func() []float64 {
return func() []float64 {
_, final := stats(w, regex, nil)
return final
return []float64{final}
}
}

Expand Down
11 changes: 6 additions & 5 deletions metrics/teststat/teststat.go
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"strings"

"github.com/go-kit/kit/metrics"
Expand Down Expand Up @@ -38,24 +39,24 @@ func FillCounter(counter metrics.Counter) float64 {

// TestGauge puts some values through the gauge, and then calls the value func
// to check that the gauge has the correct final value.
func TestGauge(gauge metrics.Gauge, value func() float64) error {
func TestGauge(gauge metrics.Gauge, value func() []float64) error {
a := rand.Perm(100)
n := rand.Intn(len(a))

var want float64
var want []float64
for i := 0; i < n; i++ {
f := float64(a[i])
gauge.Set(f)
want = f
want = append(want, f)
}

for i := 0; i < n; i++ {
f := float64(a[i])
gauge.Add(f)
want += f
want[len(want)-1] += f
}

if have := value(); want != have {
if have := value(); reflect.DeepEqual(want, have) {
return fmt.Errorf("want %f, have %f", want, have)
}

Expand Down