From b91287107c231a6c9b47992940da8b9a3608d5c1 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Mon, 18 Jul 2022 15:17:17 -0500 Subject: [PATCH 1/4] adding metric labels to all metrics patched off of v0.3.1 --- awareness.go | 12 ++++++++---- awareness_test.go | 2 +- config.go | 4 ++++ memberlist.go | 16 ++++++++++++---- net_transport.go | 31 ++++++++++++++++++++++++++----- state.go | 16 ++++++++-------- 6 files changed, 59 insertions(+), 22 deletions(-) diff --git a/awareness.go b/awareness.go index ea95c7538..53b1bb313 100644 --- a/awareness.go +++ b/awareness.go @@ -21,13 +21,17 @@ type awareness struct { // score is the current awareness score. Lower values are healthier and // zero is the minimum value. score int + + // metricLabels is the slice of labels to put on all emitted metrics + metricLabels []metrics.Label } // newAwareness returns a new awareness object. -func newAwareness(max int) *awareness { +func newAwareness(max int, metricLabels []metrics.Label) *awareness { return &awareness{ - max: max, - score: 0, + max: max, + score: 0, + metricLabels: metricLabels, } } @@ -47,7 +51,7 @@ func (a *awareness) ApplyDelta(delta int) { a.Unlock() if initial != final { - metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final)) + metrics.SetGaugeWithLabels([]string{"memberlist", "health", "score"}, float32(final), a.metricLabels) } } diff --git a/awareness_test.go b/awareness_test.go index c6ade10af..1e8fc7444 100644 --- a/awareness_test.go +++ b/awareness_test.go @@ -27,7 +27,7 @@ func TestAwareness(t *testing.T) { {-1, 0, 1 * time.Second}, } - a := newAwareness(8) + a := newAwareness(8, nil) for i, c := range cases { a.ApplyDelta(c.delta) if a.GetHealthScore() != c.score { diff --git a/config.go b/config.go index d7fe4c37b..bf9002fae 100644 --- a/config.go +++ b/config.go @@ -244,10 +244,14 @@ type Config struct { // RequireNodeNames controls if the name of a node is required when sending // a message to that node. RequireNodeNames bool + // CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks // allowed to connect (you must specify IPv6/IPv4 separately) // Using [] will block all connections. CIDRsAllowed []net.IPNet + + // MetricLabels is a map of optional labels to apply to all metrics emitted. + MetricLabels map[string]string } // ParseCIDRs return a possible empty list of all Network that have been parsed diff --git a/memberlist.go b/memberlist.go index cab6db69f..958b47e3a 100644 --- a/memberlist.go +++ b/memberlist.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" sockaddr "github.com/hashicorp/go-sockaddr" "github.com/miekg/dns" @@ -77,6 +78,9 @@ type Memberlist struct { broadcasts *TransmitLimitedQueue logger *log.Logger + + // metricLabels is the slice of labels to put on all emitted metrics + metricLabels []metrics.Label } // BuildVsnArray creates the array of Vsn @@ -135,9 +139,10 @@ func newMemberlist(conf *Config) (*Memberlist, error) { transport := conf.Transport if transport == nil { nc := &NetTransportConfig{ - BindAddrs: []string{conf.BindAddr}, - BindPort: conf.BindPort, - Logger: logger, + BindAddrs: []string{conf.BindAddr}, + BindPort: conf.BindPort, + Logger: logger, + MetricLabels: conf.MetricLabels, } // See comment below for details about the retry in here. @@ -198,6 +203,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } } + metricLabels := mapToLabels(conf.MetricLabels) + m := &Memberlist{ config: conf, shutdownCh: make(chan struct{}), @@ -208,10 +215,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) { lowPriorityMsgQueue: list.New(), nodeMap: make(map[string]*nodeState), nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), + awareness: newAwareness(conf.AwarenessMaxMultiplier, metricLabels), ackHandlers: make(map[uint32]*ackHandler), broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, logger: logger, + metricLabels: metricLabels, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() diff --git a/net_transport.go b/net_transport.go index 058301172..cd8dbf9db 100644 --- a/net_transport.go +++ b/net_transport.go @@ -35,6 +35,10 @@ type NetTransportConfig struct { // Logger is a logger for operator messages. Logger *log.Logger + + // MetricLabels is a map of optional labels to apply to all metrics + // emitted by this transport. + MetricLabels map[string]string } // NetTransport is a Transport implementation that uses connectionless UDP for @@ -48,6 +52,8 @@ type NetTransport struct { tcpListeners []*net.TCPListener udpListeners []*net.UDPConn shutdown int32 + + metricLabels []metrics.Label } var _ NodeAwareTransport = (*NetTransport)(nil) @@ -64,10 +70,11 @@ func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { // Build out the new transport. var ok bool t := NetTransport{ - config: config, - packetCh: make(chan *Packet), - streamCh: make(chan net.Conn), - logger: config.Logger, + config: config, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + logger: config.Logger, + metricLabels: mapToLabels(config.MetricLabels), } // Clean up listeners if there's an error. @@ -341,7 +348,7 @@ func (t *NetTransport) udpListen(udpLn *net.UDPConn) { } // Ingest the packet. - metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) + metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "received"}, float32(n), t.metricLabels) t.packetCh <- &Packet{ Buf: buf[:n], From: addr, @@ -364,3 +371,17 @@ func setUDPRecvBuf(c *net.UDPConn) error { } return err } + +func mapToLabels(m map[string]string) []metrics.Label { + if len(m) == 0 { + return nil + } + out := make([]metrics.Label, 0, len(m)) + for k, v := range m { + out = append(out, metrics.Label{ + Name: k, + Value: v, + }) + } + return out +} diff --git a/state.go b/state.go index 7a2339e9b..a9ee88996 100644 --- a/state.go +++ b/state.go @@ -286,14 +286,14 @@ func failedRemote(err error) bool { // probeNode handles a single round of failure checking on a node. func (m *Memberlist) probeNode(node *nodeState) { - defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "probeNode"}, time.Now(), m.metricLabels) // We use our health awareness to scale the overall probe interval, so we // slow down if we detect problems. The ticker that calls us can handle // us running over the base interval, and will skip missed ticks. probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval) if probeInterval > m.config.ProbeInterval { - metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "probe"}, 1, m.metricLabels) } // Prepare a ping message and setup an ack handler. @@ -573,7 +573,7 @@ func (m *Memberlist) resetNodes() { // gossip is invoked every GossipInterval period to broadcast our gossip // messages to a few random nodes. func (m *Memberlist) gossip() { - defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "gossip"}, time.Now(), m.metricLabels) // Get some random live, suspect, or recently dead nodes m.nodeLock.RLock() @@ -653,7 +653,7 @@ func (m *Memberlist) pushPull() { // pushPullNode does a complete state exchange with a specific node. func (m *Memberlist) pushPullNode(a Address, join bool) error { - defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "pushPullNode"}, time.Now(), m.metricLabels) // Attempt to send and receive with the node remote, userState, err := m.sendAndReceiveState(a, join) @@ -1125,7 +1125,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "alive"}, 1, m.metricLabels) // Notify the delegate of any relevant updates if m.config.Events != nil { @@ -1183,7 +1183,7 @@ func (m *Memberlist) suspectNode(s *suspect) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "suspect"}, 1, m.metricLabels) // Update the state state.Incarnation = s.Incarnation @@ -1221,7 +1221,7 @@ func (m *Memberlist) suspectNode(s *suspect) { if timeout { if k > 0 && numConfirmations < k { - metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "timeout"}, 1, m.metricLabels) } m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", @@ -1274,7 +1274,7 @@ func (m *Memberlist) deadNode(d *dead) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "dead"}, 1, m.metricLabels) // Update the state state.Incarnation = d.Incarnation From fe86bd008cb883db29a2552d22563b6196eb1ce8 Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 1 Aug 2022 12:41:05 -0400 Subject: [PATCH 2/4] Add metrics and test TestUtil_MapToLabels (#267) --- net.go | 8 ++++---- net_transport.go | 14 -------------- util.go | 15 +++++++++++++++ util_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 18 deletions(-) diff --git a/net.go b/net.go index 1555b172a..a8291c4f3 100644 --- a/net.go +++ b/net.go @@ -234,7 +234,7 @@ func (m *Memberlist) handleConn(conn net.Conn) { defer conn.Close() m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) - metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "accept"}, 1, m.metricLabels) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -869,7 +869,7 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { msg = buf.Bytes() } - metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) + metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "sent"}, float32(len(msg)), m.metricLabels) _, err := m.transport.WriteToAddress(msg, a) return err } @@ -898,7 +898,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel } // Write out the entire send buffer - metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf))) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)), m.metricLabels) if n, err := conn.Write(sendBuf); err != nil { return err @@ -953,7 +953,7 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, } defer conn.Close() m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr()) - metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "connect"}, 1, m.metricLabels) // Send our state if err := m.sendLocalState(conn, join, m.config.Label); err != nil { diff --git a/net_transport.go b/net_transport.go index cd8dbf9db..413e83502 100644 --- a/net_transport.go +++ b/net_transport.go @@ -371,17 +371,3 @@ func setUDPRecvBuf(c *net.UDPConn) error { } return err } - -func mapToLabels(m map[string]string) []metrics.Label { - if len(m) == 0 { - return nil - } - out := make([]metrics.Label, 0, len(m)) - for k, v := range m { - out = append(out, metrics.Label{ - Name: k, - Value: v, - }) - } - return out -} diff --git a/util.go b/util.go index 24112210d..d23dc46c3 100644 --- a/util.go +++ b/util.go @@ -13,6 +13,7 @@ import ( "strings" "time" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" "github.com/sean-/seed" ) @@ -324,3 +325,17 @@ func ensurePort(s string, port int) string { s = net.JoinHostPort(s, strconv.Itoa(port)) return s } + +func mapToLabels(m map[string]string) []metrics.Label { + if len(m) == 0 { + return nil + } + out := make([]metrics.Label, 0, len(m)) + for k, v := range m { + out = append(out, metrics.Label{ + Name: k, + Value: v, + }) + } + return out +} diff --git a/util_test.go b/util_test.go index 5e3edb633..2e9668155 100644 --- a/util_test.go +++ b/util_test.go @@ -384,3 +384,30 @@ func TestCompressDecompressPayload(t *testing.T) { t.Fatalf("bad payload: %v", decomp) } } + +func TestUtil_MapToLabels(t *testing.T) { + tests := []struct { + name string + labels map[string]string + }{ + {"empty labels", nil}, + { + name: "multiple labels", + labels: map[string]string{ + "name1": "value1", + "name2": "value2", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := mapToLabels(tt.labels) + countLabels := 0 + for _, kv := range got { + require.Equal(t, tt.labels[kv.Name], kv.Value) + countLabels++ + } + require.Equal(t, len(tt.labels), countLabels) + }) + } +} From 59d7d84c57b9a5365d4f5783ad95a7376c5ef682 Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 4 Aug 2022 10:39:02 -0400 Subject: [PATCH 3/4] Change metrics type from map[string]string to []metrics --- config.go | 3 ++- memberlist.go | 2 +- net_transport.go | 4 ++-- util.go | 15 --------------- util_test.go | 27 --------------------------- 5 files changed, 5 insertions(+), 46 deletions(-) diff --git a/config.go b/config.go index bf9002fae..d83a4f3fc 100644 --- a/config.go +++ b/config.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" ) @@ -251,7 +252,7 @@ type Config struct { CIDRsAllowed []net.IPNet // MetricLabels is a map of optional labels to apply to all metrics emitted. - MetricLabels map[string]string + MetricLabels []metrics.Label } // ParseCIDRs return a possible empty list of all Network that have been parsed diff --git a/memberlist.go b/memberlist.go index 958b47e3a..b50496b75 100644 --- a/memberlist.go +++ b/memberlist.go @@ -203,7 +203,7 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } } - metricLabels := mapToLabels(conf.MetricLabels) + metricLabels := conf.MetricLabels m := &Memberlist{ config: conf, diff --git a/net_transport.go b/net_transport.go index 413e83502..a379c855c 100644 --- a/net_transport.go +++ b/net_transport.go @@ -38,7 +38,7 @@ type NetTransportConfig struct { // MetricLabels is a map of optional labels to apply to all metrics // emitted by this transport. - MetricLabels map[string]string + MetricLabels []metrics.Label } // NetTransport is a Transport implementation that uses connectionless UDP for @@ -74,7 +74,7 @@ func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { packetCh: make(chan *Packet), streamCh: make(chan net.Conn), logger: config.Logger, - metricLabels: mapToLabels(config.MetricLabels), + metricLabels: config.MetricLabels, } // Clean up listeners if there's an error. diff --git a/util.go b/util.go index d23dc46c3..24112210d 100644 --- a/util.go +++ b/util.go @@ -13,7 +13,6 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" "github.com/sean-/seed" ) @@ -325,17 +324,3 @@ func ensurePort(s string, port int) string { s = net.JoinHostPort(s, strconv.Itoa(port)) return s } - -func mapToLabels(m map[string]string) []metrics.Label { - if len(m) == 0 { - return nil - } - out := make([]metrics.Label, 0, len(m)) - for k, v := range m { - out = append(out, metrics.Label{ - Name: k, - Value: v, - }) - } - return out -} diff --git a/util_test.go b/util_test.go index 2e9668155..5e3edb633 100644 --- a/util_test.go +++ b/util_test.go @@ -384,30 +384,3 @@ func TestCompressDecompressPayload(t *testing.T) { t.Fatalf("bad payload: %v", decomp) } } - -func TestUtil_MapToLabels(t *testing.T) { - tests := []struct { - name string - labels map[string]string - }{ - {"empty labels", nil}, - { - name: "multiple labels", - labels: map[string]string{ - "name1": "value1", - "name2": "value2", - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := mapToLabels(tt.labels) - countLabels := 0 - for _, kv := range got { - require.Equal(t, tt.labels[kv.Name], kv.Value) - countLabels++ - } - require.Equal(t, len(tt.labels), countLabels) - }) - } -} From ad9884f85af57e91ae3aaea048d6c26312e95adc Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 8 Aug 2022 13:45:29 -0400 Subject: [PATCH 4/4] remove unnecessary variable --- memberlist.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/memberlist.go b/memberlist.go index b50496b75..512701dee 100644 --- a/memberlist.go +++ b/memberlist.go @@ -203,8 +203,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } } - metricLabels := conf.MetricLabels - m := &Memberlist{ config: conf, shutdownCh: make(chan struct{}), @@ -215,11 +213,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) { lowPriorityMsgQueue: list.New(), nodeMap: make(map[string]*nodeState), nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier, metricLabels), + awareness: newAwareness(conf.AwarenessMaxMultiplier, conf.MetricLabels), ackHandlers: make(map[uint32]*ackHandler), broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, logger: logger, - metricLabels: metricLabels, + metricLabels: conf.MetricLabels, } m.broadcasts.NumNodes = func() int { return m.estNumNodes()