Skip to content

Commit

Permalink
use batch values API for CloudWatch PutMetric data call (#960)
Browse files Browse the repository at this point in the history
* use batch values API for CloudWatch PutMetric data call

which was introduced at https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md#release-v11536-2018-09-17

* fix test, so they can accept the list of received values from the gauge

* use batch api always
  • Loading branch information
taraspos committed Feb 20, 2020
1 parent cc938d5 commit 7dd0815
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 36 deletions.
28 changes: 24 additions & 4 deletions metrics/cloudwatch/cloudwatch.go
Expand Up @@ -20,6 +20,7 @@ import (

const (
maxConcurrentRequests = 20
maxValuesInABatch = 150
)

type Percentiles []struct {
Expand Down Expand Up @@ -174,13 +175,32 @@ func (cw *CloudWatch) Send() error {
})

cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := last(values)
datums = append(datums, &cloudwatch.MetricDatum{
datum := &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}

if len(values) == 0 {
return true
}

// CloudWatch Put Metrics API (https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html)
// expects batch of unique values including the array of corresponding counts
valuesCounter := make(map[float64]int)
for _, v := range values {
valuesCounter[v]++
}

for value, count := range valuesCounter {
if len(datum.Values) == maxValuesInABatch {
break
}
datum.Values = append(datum.Values, aws.Float64(value))
datum.Counts = append(datum.Counts, aws.Float64(float64(count)))
}

datums = append(datums, datum)
return true
})

Expand Down
72 changes: 55 additions & 17 deletions metrics/cloudwatch/cloudwatch_test.go
Expand Up @@ -18,13 +18,13 @@ import (
type mockCloudWatch struct {
cloudwatchiface.CloudWatchAPI
mtx sync.RWMutex
valuesReceived map[string]float64
valuesReceived map[string][]float64
dimensionsReceived map[string][]*cloudwatch.Dimension
}

func newMockCloudWatch() *mockCloudWatch {
return &mockCloudWatch{
valuesReceived: map[string]float64{},
valuesReceived: map[string][]float64{},
dimensionsReceived: map[string][]*cloudwatch.Dimension{},
}
}
Expand All @@ -33,7 +33,13 @@ func (mcw *mockCloudWatch) PutMetricData(input *cloudwatch.PutMetricDataInput) (
mcw.mtx.Lock()
defer mcw.mtx.Unlock()
for _, datum := range input.MetricData {
mcw.valuesReceived[*datum.MetricName] = *datum.Value
if len(datum.Values) > 0 {
for _, v := range datum.Values {
mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *v)
}
} else {
mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *datum.Value)
}
mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions
}
return nil, nil
Expand Down Expand Up @@ -76,13 +82,15 @@ func TestCounter(t *testing.T) {
cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
counter := cw.NewCounter(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
if err := cw.Send(); err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
value := svc.valuesReceived[name][len(svc.valuesReceived[name])-1]
delete(svc.valuesReceived, name)

return value
}
if err := teststat.TestCounter(counter, valuef); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -123,7 +131,13 @@ func TestCounterLowSendConcurrency(t *testing.T) {
}

for i, name := range names {
if svc.valuesReceived[name] != wants[i] {
if l := len(svc.valuesReceived[name]); l == 0 && wants[i] == 0 {
continue
} else if l != 1 {
t.Fatalf("one value expected, got %d", l)
}

if svc.valuesReceived[name][0] != wants[i] {
t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name])
}
if err := svc.testDimensions(name, labels[i], values[i]); err != nil {
Expand All @@ -138,15 +152,17 @@ func TestGauge(t *testing.T) {
svc := newMockCloudWatch()
cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
gauge := cw.NewGauge(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
if err != nil {
valuef := func() []float64 {
if err := cw.Send(); err != nil {
t.Fatal(err)
}
svc.mtx.RLock()
res := svc.valuesReceived[name]
delete(svc.valuesReceived, name)
defer svc.mtx.RUnlock()
return svc.valuesReceived[name]
return res
}

if err := teststat.TestGauge(gauge, valuef); err != nil {
t.Fatal(err)
}
Expand All @@ -170,12 +186,28 @@ func TestHistogram(t *testing.T) {
if err != nil {
t.Fatal(err)
}

svc.mtx.RLock()
defer svc.mtx.RUnlock()
p50 = svc.valuesReceived[n50]
p90 = svc.valuesReceived[n90]
p95 = svc.valuesReceived[n95]
p99 = svc.valuesReceived[n99]
if len(svc.valuesReceived[n50]) > 0 {
p50 = svc.valuesReceived[n50][0]
delete(svc.valuesReceived, n50)
}

if len(svc.valuesReceived[n90]) > 0 {
p90 = svc.valuesReceived[n90][0]
delete(svc.valuesReceived, n90)
}

if len(svc.valuesReceived[n95]) > 0 {
p95 = svc.valuesReceived[n95][0]
delete(svc.valuesReceived, n95)
}

if len(svc.valuesReceived[n99]) > 0 {
p99 = svc.valuesReceived[n99][0]
delete(svc.valuesReceived, n99)
}
return
}
if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
Expand Down Expand Up @@ -207,8 +239,14 @@ func TestHistogram(t *testing.T) {
}
svc.mtx.RLock()
defer svc.mtx.RUnlock()
p50 = svc.valuesReceived[n50]
p90 = svc.valuesReceived[n90]
if len(svc.valuesReceived[n50]) > 0 {
p50 = svc.valuesReceived[n50][0]
delete(svc.valuesReceived, n50)
}
if len(svc.valuesReceived[n90]) > 0 {
p90 = svc.valuesReceived[n90][0]
delete(svc.valuesReceived, n90)
}

// our teststat.TestHistogram wants us to give p95 and p99,
// but with custom percentiles we don't have those.
Expand Down
2 changes: 1 addition & 1 deletion metrics/expvar/expvar_test.go
Expand Up @@ -17,7 +17,7 @@ func TestCounter(t *testing.T) {

func TestGauge(t *testing.T) {
gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge)
value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f }
value := func() []float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return []float64{f} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion metrics/generic/generic_test.go
Expand Up @@ -45,7 +45,7 @@ func TestGauge(t *testing.T) {
if want, have := name, gauge.Name; want != have {
t.Errorf("Name: want %q, have %q", want, have)
}
value := gauge.Value
value := func() []float64 { return []float64{gauge.Value()} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions metrics/influx/influx_test.go
Expand Up @@ -34,12 +34,12 @@ func TestGauge(t *testing.T) {
in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`)
gauge := in.NewGauge("influx_gauge")
value := func() float64 {
value := func() []float64 {
client := &bufWriter{}
in.WriteTo(client)
match := re.FindStringSubmatch(client.buf.String())
f, _ := strconv.ParseFloat(match[1], 64)
return f
return []float64{f}
}
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion metrics/pcp/pcp_test.go
Expand Up @@ -40,7 +40,7 @@ func TestGauge(t *testing.T) {

gauge = gauge.With("label values", "not supported").(*Gauge)

value := func() float64 { f := gauge.g.Val(); return f }
value := func() []float64 { f := gauge.g.Val(); return []float64{f} }
if err := teststat.TestGauge(gauge, value); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions metrics/prometheus/prometheus_test.go
Expand Up @@ -68,10 +68,10 @@ func TestGauge(t *testing.T) {
Help: "This is a different help string.",
}, []string{"foo"}).With("foo", "bar")

value := func() float64 {
value := func() []float64 {
matches := re.FindStringSubmatch(scrape())
f, _ := strconv.ParseFloat(matches[1], 64)
return f
return []float64{f}
}

if err := teststat.TestGauge(gauge, value); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions metrics/teststat/buffers.go
Expand Up @@ -23,10 +23,10 @@ func SumLines(w io.WriterTo, regex string) func() float64 {
// LastLine expects a regex whose first capture group can be parsed as a
// float64. It will dump the WriterTo and parse each line, expecting to find a
// match. It returns the final captured float.
func LastLine(w io.WriterTo, regex string) func() float64 {
return func() float64 {
func LastLine(w io.WriterTo, regex string) func() []float64 {
return func() []float64 {
_, final := stats(w, regex, nil)
return final
return []float64{final}
}
}

Expand Down
11 changes: 6 additions & 5 deletions metrics/teststat/teststat.go
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"strings"

"github.com/go-kit/kit/metrics"
Expand Down Expand Up @@ -38,24 +39,24 @@ func FillCounter(counter metrics.Counter) float64 {

// TestGauge puts some values through the gauge, and then calls the value func
// to check that the gauge has the correct final value.
func TestGauge(gauge metrics.Gauge, value func() float64) error {
func TestGauge(gauge metrics.Gauge, value func() []float64) error {
a := rand.Perm(100)
n := rand.Intn(len(a))

var want float64
var want []float64
for i := 0; i < n; i++ {
f := float64(a[i])
gauge.Set(f)
want = f
want = append(want, f)
}

for i := 0; i < n; i++ {
f := float64(a[i])
gauge.Add(f)
want += f
want[len(want)-1] += f
}

if have := value(); want != have {
if have := value(); reflect.DeepEqual(want, have) {
return fmt.Errorf("want %f, have %f", want, have)
}

Expand Down

0 comments on commit 7dd0815

Please sign in to comment.