From e6ff9b2d87a3f0f3f04abb5672ada3ac2a640223 Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 8 Aug 2022 13:47:25 -0400 Subject: [PATCH] Add metrics labels to all metrics (#270) * adding metric labels to all metrics Co-authored-by: R.B. Boyer --- awareness.go | 12 ++++++++---- awareness_test.go | 2 +- config.go | 5 +++++ memberlist.go | 14 ++++++++++---- net.go | 8 ++++---- net_transport.go | 17 ++++++++++++----- state.go | 16 ++++++++-------- 7 files changed, 48 insertions(+), 26 deletions(-) diff --git a/awareness.go b/awareness.go index ea95c7538..53b1bb313 100644 --- a/awareness.go +++ b/awareness.go @@ -21,13 +21,17 @@ type awareness struct { // score is the current awareness score. Lower values are healthier and // zero is the minimum value. score int + + // metricLabels is the slice of labels to put on all emitted metrics + metricLabels []metrics.Label } // newAwareness returns a new awareness object. -func newAwareness(max int) *awareness { +func newAwareness(max int, metricLabels []metrics.Label) *awareness { return &awareness{ - max: max, - score: 0, + max: max, + score: 0, + metricLabels: metricLabels, } } @@ -47,7 +51,7 @@ func (a *awareness) ApplyDelta(delta int) { a.Unlock() if initial != final { - metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final)) + metrics.SetGaugeWithLabels([]string{"memberlist", "health", "score"}, float32(final), a.metricLabels) } } diff --git a/awareness_test.go b/awareness_test.go index c6ade10af..1e8fc7444 100644 --- a/awareness_test.go +++ b/awareness_test.go @@ -27,7 +27,7 @@ func TestAwareness(t *testing.T) { {-1, 0, 1 * time.Second}, } - a := newAwareness(8) + a := newAwareness(8, nil) for i, c := range cases { a.ApplyDelta(c.delta) if a.GetHealthScore() != c.score { diff --git a/config.go b/config.go index d7fe4c37b..d83a4f3fc 100644 --- a/config.go +++ b/config.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" ) @@ -244,10 +245,14 @@ type Config struct { // RequireNodeNames controls if the name of a node is required when sending // a message to that node. RequireNodeNames bool + // CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks // allowed to connect (you must specify IPv6/IPv4 separately) // Using [] will block all connections. CIDRsAllowed []net.IPNet + + // MetricLabels is a map of optional labels to apply to all metrics emitted. + MetricLabels []metrics.Label } // ParseCIDRs return a possible empty list of all Network that have been parsed diff --git a/memberlist.go b/memberlist.go index cab6db69f..512701dee 100644 --- a/memberlist.go +++ b/memberlist.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" sockaddr "github.com/hashicorp/go-sockaddr" "github.com/miekg/dns" @@ -77,6 +78,9 @@ type Memberlist struct { broadcasts *TransmitLimitedQueue logger *log.Logger + + // metricLabels is the slice of labels to put on all emitted metrics + metricLabels []metrics.Label } // BuildVsnArray creates the array of Vsn @@ -135,9 +139,10 @@ func newMemberlist(conf *Config) (*Memberlist, error) { transport := conf.Transport if transport == nil { nc := &NetTransportConfig{ - BindAddrs: []string{conf.BindAddr}, - BindPort: conf.BindPort, - Logger: logger, + BindAddrs: []string{conf.BindAddr}, + BindPort: conf.BindPort, + Logger: logger, + MetricLabels: conf.MetricLabels, } // See comment below for details about the retry in here. @@ -208,10 +213,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) { lowPriorityMsgQueue: list.New(), nodeMap: make(map[string]*nodeState), nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), + awareness: newAwareness(conf.AwarenessMaxMultiplier, conf.MetricLabels), ackHandlers: make(map[uint32]*ackHandler), broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, logger: logger, + metricLabels: conf.MetricLabels, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() diff --git a/net.go b/net.go index 1555b172a..a8291c4f3 100644 --- a/net.go +++ b/net.go @@ -234,7 +234,7 @@ func (m *Memberlist) handleConn(conn net.Conn) { defer conn.Close() m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) - metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "accept"}, 1, m.metricLabels) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -869,7 +869,7 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { msg = buf.Bytes() } - metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) + metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "sent"}, float32(len(msg)), m.metricLabels) _, err := m.transport.WriteToAddress(msg, a) return err } @@ -898,7 +898,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel } // Write out the entire send buffer - metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf))) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)), m.metricLabels) if n, err := conn.Write(sendBuf); err != nil { return err @@ -953,7 +953,7 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, } defer conn.Close() m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr()) - metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "connect"}, 1, m.metricLabels) // Send our state if err := m.sendLocalState(conn, join, m.config.Label); err != nil { diff --git a/net_transport.go b/net_transport.go index 058301172..a379c855c 100644 --- a/net_transport.go +++ b/net_transport.go @@ -35,6 +35,10 @@ type NetTransportConfig struct { // Logger is a logger for operator messages. Logger *log.Logger + + // MetricLabels is a map of optional labels to apply to all metrics + // emitted by this transport. + MetricLabels []metrics.Label } // NetTransport is a Transport implementation that uses connectionless UDP for @@ -48,6 +52,8 @@ type NetTransport struct { tcpListeners []*net.TCPListener udpListeners []*net.UDPConn shutdown int32 + + metricLabels []metrics.Label } var _ NodeAwareTransport = (*NetTransport)(nil) @@ -64,10 +70,11 @@ func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { // Build out the new transport. var ok bool t := NetTransport{ - config: config, - packetCh: make(chan *Packet), - streamCh: make(chan net.Conn), - logger: config.Logger, + config: config, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + logger: config.Logger, + metricLabels: config.MetricLabels, } // Clean up listeners if there's an error. @@ -341,7 +348,7 @@ func (t *NetTransport) udpListen(udpLn *net.UDPConn) { } // Ingest the packet. - metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) + metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "received"}, float32(n), t.metricLabels) t.packetCh <- &Packet{ Buf: buf[:n], From: addr, diff --git a/state.go b/state.go index 7a2339e9b..a9ee88996 100644 --- a/state.go +++ b/state.go @@ -286,14 +286,14 @@ func failedRemote(err error) bool { // probeNode handles a single round of failure checking on a node. func (m *Memberlist) probeNode(node *nodeState) { - defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "probeNode"}, time.Now(), m.metricLabels) // We use our health awareness to scale the overall probe interval, so we // slow down if we detect problems. The ticker that calls us can handle // us running over the base interval, and will skip missed ticks. probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval) if probeInterval > m.config.ProbeInterval { - metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "probe"}, 1, m.metricLabels) } // Prepare a ping message and setup an ack handler. @@ -573,7 +573,7 @@ func (m *Memberlist) resetNodes() { // gossip is invoked every GossipInterval period to broadcast our gossip // messages to a few random nodes. func (m *Memberlist) gossip() { - defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "gossip"}, time.Now(), m.metricLabels) // Get some random live, suspect, or recently dead nodes m.nodeLock.RLock() @@ -653,7 +653,7 @@ func (m *Memberlist) pushPull() { // pushPullNode does a complete state exchange with a specific node. func (m *Memberlist) pushPullNode(a Address, join bool) error { - defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"memberlist", "pushPullNode"}, time.Now(), m.metricLabels) // Attempt to send and receive with the node remote, userState, err := m.sendAndReceiveState(a, join) @@ -1125,7 +1125,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "alive"}, 1, m.metricLabels) // Notify the delegate of any relevant updates if m.config.Events != nil { @@ -1183,7 +1183,7 @@ func (m *Memberlist) suspectNode(s *suspect) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "suspect"}, 1, m.metricLabels) // Update the state state.Incarnation = s.Incarnation @@ -1221,7 +1221,7 @@ func (m *Memberlist) suspectNode(s *suspect) { if timeout { if k > 0 && numConfirmations < k { - metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "timeout"}, 1, m.metricLabels) } m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", @@ -1274,7 +1274,7 @@ func (m *Memberlist) deadNode(d *dead) { } // Update metrics - metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1) + metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "dead"}, 1, m.metricLabels) // Update the state state.Incarnation = d.Incarnation