Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit metrics for local memberlist size and remote memberlist size #276

Merged
merged 12 commits into from Sep 14, 2022
Merged
7 changes: 1 addition & 6 deletions memberlist_test.go
Expand Up @@ -259,13 +259,8 @@ func TestCreate_checkBroadcastQueueMetrics(t *testing.T) {

time.Sleep(3 * time.Second)

intv := getIntervalMetrics(t, sink)
sampleName := "consul.usage.test.memberlist.queue.broadcasts"
actualSample := intv.Samples[sampleName]

if actualSample.Count == 0 {
t.Fatalf("%s sample not taken", sampleName)
}
verifySampleExists(t, sampleName, sink)
}

func TestCreate_keyringOnly(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions net.go
Expand Up @@ -1007,6 +1007,22 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string
}
m.nodeLock.RUnlock()

nodeStateCounts := make(map[string]int)
nodeStateCounts[StateAlive.metricsString()] = 0
nodeStateCounts[StateLeft.metricsString()] = 0
nodeStateCounts[StateDead.metricsString()] = 0
nodeStateCounts[StateSuspect.metricsString()] = 0

for _, n := range localNodes {
nodeStateCounts[n.State.metricsString()]++
}

for nodeState, cnt := range nodeStateCounts {
metrics.SetGaugeWithLabels([]string{"memberlist", "node", "instances"},
float32(cnt),
append(m.metricLabels, metrics.Label{Name: "node_state", Value: nodeState}))
}

// Get the delegate state
var userData []byte
if m.config.Delegate != nil {
Expand Down Expand Up @@ -1042,6 +1058,9 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string
}
}

moreBytes := binary.BigEndian.Uint32(bufConn.Bytes()[1:5])
metrics.SetGaugeWithLabels([]string{"memberlist", "size", "local"}, float32(moreBytes), m.metricLabels)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestions for a better metric name? (also, see question below about removing the remote metric, which might make naming this easier.)


// Get the send buffer
return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel)
}
Expand Down Expand Up @@ -1088,6 +1107,8 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) (
// Ensure we aren't asked to download too much. This is to guard against
// an attack vector where a huge amount of state is sent
moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5])
metrics.SetGaugeWithLabels([]string{"memberlist", "size", "remote"}, float32(moreBytes), m.metricLabels)
jmurret marked this conversation as resolved.
Show resolved Hide resolved

if moreBytes > maxPushStateBytes {
return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes)

Expand Down
15 changes: 15 additions & 0 deletions state.go
Expand Up @@ -15,6 +15,21 @@ import (

type NodeStateType int

func (t NodeStateType) metricsString() string {
switch t {
case StateAlive:
return "alive"
case StateDead:
return "dead"
case StateSuspect:
return "suspect"
case StateLeft:
return "left"
default:
return ""
jmurret marked this conversation as resolved.
Show resolved Hide resolved
}
}

const (
StateAlive NodeStateType = iota
StateSuspect
Expand Down
28 changes: 28 additions & 0 deletions state_test.go
Expand Up @@ -2239,6 +2239,8 @@ func TestMemberlist_PushPull(t *testing.T) {
ip1 := []byte(addr1)
ip2 := []byte(addr2)

sink := registerInMemorySink(t)

ch := make(chan NodeEvent, 3)

m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
Expand Down Expand Up @@ -2270,6 +2272,13 @@ func TestMemberlist_PushPull(t *testing.T) {
if len(ch) < 2 {
failf("expected 2 messages from pushPull")
}

instancesMetricName := "consul.usage.test.memberlist.node.instances"
verifyGaugeExists(t, "consul.usage.test.memberlist.size.local", sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateAlive.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateDead.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateLeft.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateSuspect.metricsString()), sink)
})
}

Expand Down Expand Up @@ -2412,3 +2421,22 @@ func getIntervalMetrics(t *testing.T, sink *metrics.InmemSink) *metrics.Interval
intv := intervals[0]
return intv
}

func verifyGaugeExists(t *testing.T, name string, sink *metrics.InmemSink) {
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()
if _, ok := interval.Gauges[name]; !ok {
t.Fatalf("%s gauge not emmited", name)
}
}

func verifySampleExists(t *testing.T, name string, sink *metrics.InmemSink) {
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()

if _, ok := interval.Samples[name]; !ok {
t.Fatalf("%s sample not emmited", name)
}
}