Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics labels to all metrics #270

Merged
merged 4 commits into from Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions awareness.go
Expand Up @@ -21,13 +21,17 @@ type awareness struct {
// score is the current awareness score. Lower values are healthier and
// zero is the minimum value.
score int

// metricLabels is the slice of labels to put on all emitted metrics
metricLabels []metrics.Label
}

// newAwareness returns a new awareness object.
func newAwareness(max int) *awareness {
func newAwareness(max int, metricLabels []metrics.Label) *awareness {
return &awareness{
max: max,
score: 0,
max: max,
score: 0,
metricLabels: metricLabels,
}
}

Expand All @@ -47,7 +51,7 @@ func (a *awareness) ApplyDelta(delta int) {
a.Unlock()

if initial != final {
metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final))
metrics.SetGaugeWithLabels([]string{"memberlist", "health", "score"}, float32(final), a.metricLabels)
}
}

Expand Down
2 changes: 1 addition & 1 deletion awareness_test.go
Expand Up @@ -27,7 +27,7 @@ func TestAwareness(t *testing.T) {
{-1, 0, 1 * time.Second},
}

a := newAwareness(8)
a := newAwareness(8, nil)
for i, c := range cases {
a.ApplyDelta(c.delta)
if a.GetHealthScore() != c.score {
Expand Down
4 changes: 4 additions & 0 deletions config.go
Expand Up @@ -244,10 +244,14 @@ type Config struct {
// RequireNodeNames controls if the name of a node is required when sending
// a message to that node.
RequireNodeNames bool

// CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks
// allowed to connect (you must specify IPv6/IPv4 separately)
// Using [] will block all connections.
CIDRsAllowed []net.IPNet

// MetricLabels is a map of optional labels to apply to all metrics emitted.
MetricLabels map[string]string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to match the serf config and have this be a slice of metric labels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way looks good to me. If we switch to MetricLabels []metrics.Label, we can get rid of mapToLabels and pass []metrics.Label all the way from consul to memberlist, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

}

// ParseCIDRs return a possible empty list of all Network that have been parsed
Expand Down
16 changes: 12 additions & 4 deletions memberlist.go
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"
"time"

"github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/miekg/dns"
Expand Down Expand Up @@ -77,6 +78,9 @@ type Memberlist struct {
broadcasts *TransmitLimitedQueue

logger *log.Logger

// metricLabels is the slice of labels to put on all emitted metrics
metricLabels []metrics.Label
}

// BuildVsnArray creates the array of Vsn
Expand Down Expand Up @@ -135,9 +139,10 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
MetricLabels: conf.MetricLabels,
}

// See comment below for details about the retry in here.
Expand Down Expand Up @@ -198,6 +203,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
}
}

metricLabels := mapToLabels(conf.MetricLabels)

m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
Expand All @@ -208,10 +215,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
awareness: newAwareness(conf.AwarenessMaxMultiplier, metricLabels),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
metricLabels: metricLabels,
}
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
Expand Down
8 changes: 4 additions & 4 deletions net.go
Expand Up @@ -234,7 +234,7 @@ func (m *Memberlist) handleConn(conn net.Conn) {
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))

metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "accept"}, 1, m.metricLabels)

conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))

Expand Down Expand Up @@ -869,7 +869,7 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error {
msg = buf.Bytes()
}

metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "sent"}, float32(len(msg)), m.metricLabels)
_, err := m.transport.WriteToAddress(msg, a)
return err
}
Expand Down Expand Up @@ -898,7 +898,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel
}

// Write out the entire send buffer
metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)), m.metricLabels)

if n, err := conn.Write(sendBuf); err != nil {
return err
Expand Down Expand Up @@ -953,7 +953,7 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState,
}
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr())
metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "connect"}, 1, m.metricLabels)

// Send our state
if err := m.sendLocalState(conn, join, m.config.Label); err != nil {
Expand Down
17 changes: 12 additions & 5 deletions net_transport.go
Expand Up @@ -35,6 +35,10 @@ type NetTransportConfig struct {

// Logger is a logger for operator messages.
Logger *log.Logger

// MetricLabels is a map of optional labels to apply to all metrics
// emitted by this transport.
MetricLabels map[string]string
}

// NetTransport is a Transport implementation that uses connectionless UDP for
Expand All @@ -48,6 +52,8 @@ type NetTransport struct {
tcpListeners []*net.TCPListener
udpListeners []*net.UDPConn
shutdown int32

metricLabels []metrics.Label
}

var _ NodeAwareTransport = (*NetTransport)(nil)
Expand All @@ -64,10 +70,11 @@ func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
// Build out the new transport.
var ok bool
t := NetTransport{
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
metricLabels: mapToLabels(config.MetricLabels),
}

// Clean up listeners if there's an error.
Expand Down Expand Up @@ -341,7 +348,7 @@ func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
}

// Ingest the packet.
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "received"}, float32(n), t.metricLabels)
t.packetCh <- &Packet{
Buf: buf[:n],
From: addr,
Expand Down
16 changes: 8 additions & 8 deletions state.go
Expand Up @@ -286,14 +286,14 @@ func failedRemote(err error) bool {

// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "probeNode"}, time.Now(), m.metricLabels)

// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "probe"}, 1, m.metricLabels)
}

// Prepare a ping message and setup an ack handler.
Expand Down Expand Up @@ -573,7 +573,7 @@ func (m *Memberlist) resetNodes() {
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "gossip"}, time.Now(), m.metricLabels)

// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
Expand Down Expand Up @@ -653,7 +653,7 @@ func (m *Memberlist) pushPull() {

// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(a Address, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "pushPullNode"}, time.Now(), m.metricLabels)

// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(a, join)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "alive"}, 1, m.metricLabels)

// Notify the delegate of any relevant updates
if m.config.Events != nil {
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "suspect"}, 1, m.metricLabels)

// Update the state
state.Incarnation = s.Incarnation
Expand Down Expand Up @@ -1221,7 +1221,7 @@ func (m *Memberlist) suspectNode(s *suspect) {

if timeout {
if k > 0 && numConfirmations < k {
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "timeout"}, 1, m.metricLabels)
}

m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (m *Memberlist) deadNode(d *dead) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "dead"}, 1, m.metricLabels)

// Update the state
state.Incarnation = d.Incarnation
Expand Down
15 changes: 15 additions & 0 deletions util.go
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/sean-/seed"
)
Expand Down Expand Up @@ -324,3 +325,17 @@ func ensurePort(s string, port int) string {
s = net.JoinHostPort(s, strconv.Itoa(port))
return s
}

func mapToLabels(m map[string]string) []metrics.Label {
if len(m) == 0 {
return nil
}
out := make([]metrics.Label, 0, len(m))
for k, v := range m {
out = append(out, metrics.Label{
Name: k,
Value: v,
})
}
return out
}
27 changes: 27 additions & 0 deletions util_test.go
Expand Up @@ -384,3 +384,30 @@ func TestCompressDecompressPayload(t *testing.T) {
t.Fatalf("bad payload: %v", decomp)
}
}

func TestUtil_MapToLabels(t *testing.T) {
tests := []struct {
name string
labels map[string]string
}{
{"empty labels", nil},
{
name: "multiple labels",
labels: map[string]string{
"name1": "value1",
"name2": "value2",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := mapToLabels(tt.labels)
countLabels := 0
for _, kv := range got {
require.Equal(t, tt.labels[kv.Name], kv.Value)
countLabels++
}
require.Equal(t, len(tt.labels), countLabels)
})
}
}