Skip to content

Commit

Permalink
Merge pull request #2299 from Shopify/dnwe/fetch-request-metrics
Browse files Browse the repository at this point in the history
feat(metrics): track consumer fetch request rates
  • Loading branch information
dnwe committed Aug 11, 2022
2 parents 820d5b2 + 5b04c98 commit 3083a9b
Show file tree
Hide file tree
Showing 21 changed files with 127 additions and 39 deletions.
4 changes: 2 additions & 2 deletions balance_strategy.go
Expand Up @@ -609,9 +609,9 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
userDataV1 := &StickyAssignorUserDataV1{}
if err := decode(userDataBytes, userDataV1); err != nil {
if err := decode(userDataBytes, userDataV1, nil); err != nil {
userDataV0 := &StickyAssignorUserDataV0{}
if err := decode(userDataBytes, userDataV0); err != nil {
if err := decode(userDataBytes, userDataV0, nil); err != nil {
return nil, err
}
return userDataV0, nil
Expand Down
29 changes: 21 additions & 8 deletions broker.go
Expand Up @@ -37,6 +37,7 @@ type Broker struct {

incomingByteRate metrics.Meter
requestRate metrics.Meter
fetchRate metrics.Meter
requestSize metrics.Histogram
requestLatency metrics.Histogram
outgoingByteRate metrics.Meter
Expand All @@ -45,6 +46,7 @@ type Broker struct {
requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerFetchRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerRequestLatency metrics.Histogram
brokerOutgoingByteRate metrics.Meter
Expand Down Expand Up @@ -208,6 +210,7 @@ func (b *Broker) Open(conf *Config) error {
// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
Expand Down Expand Up @@ -432,7 +435,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
return
}

if err := versionedDecode(packets, res, request.version()); err != nil {
if err := versionedDecode(packets, res, request.version(), b.conf.MetricRegistry); err != nil {
// Malformed response
cb(nil, err)
return
Expand Down Expand Up @@ -472,6 +475,15 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {

// Fetch returns a FetchResponse or error
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
defer func() {
if b.fetchRate != nil {
b.fetchRate.Mark(1)
}
if b.brokerFetchRate != nil {
b.brokerFetchRate.Mark(1)
}
}()

response := new(FetchResponse)

err := b.sendAndReceive(request, response)
Expand Down Expand Up @@ -1011,13 +1023,13 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return nil
}

return handleResponsePromise(req, res, promise)
return b.handleResponsePromise(req, res, promise)
}

func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
func (b *Broker) handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
return versionedDecode(buf, res, req.version(), b.conf.MetricRegistry)
case err := <-promise.errors:
return err
}
Expand Down Expand Up @@ -1109,7 +1121,7 @@ func (b *Broker) responseReceiver() {
}

decodedHeader := responseHeader{}
err = versionedDecode(header, &decodedHeader, response.headerVersion)
err = versionedDecode(header, &decodedHeader, response.headerVersion, b.conf.MetricRegistry)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
Expand Down Expand Up @@ -1170,7 +1182,7 @@ func (b *Broker) authenticateViaSASLv1() error {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom)
handshakeErr = b.handleResponsePromise(handshakeRequest, handshakeResponse, prom)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
Expand All @@ -1190,7 +1202,7 @@ func (b *Broker) authenticateViaSASLv1() error {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
}
authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom)
authErr = b.handleResponsePromise(authenticateRequest, authenticateResponse, prom)
if authErr != nil {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
Expand Down Expand Up @@ -1268,7 +1280,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
res := &SaslHandshakeResponse{}

err = versionedDecode(payload, res, 0)
err = versionedDecode(payload, res, 0, b.conf.MetricRegistry)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
Expand Down Expand Up @@ -1600,6 +1612,7 @@ func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
func (b *Broker) registerMetrics() {
b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
b.brokerRequestRate = b.registerMeter("request-rate")
b.brokerFetchRate = b.registerMeter("consumer-fetch-rate")
b.brokerRequestSize = b.registerHistogram("request-size")
b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
Expand Down
5 changes: 4 additions & 1 deletion broker_test.go
Expand Up @@ -123,7 +123,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
})
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
// Set the broker id in order to validate local broujhjker metrics
broker.id = 0
conf := NewTestConfig()
conf.ApiVersionsRequest = false
Expand All @@ -132,6 +132,9 @@ func TestSimpleBrokerCommunication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := broker.Connected(); err != nil {
t.Error(err)
}
tt.runner(t, broker)
// Wait up to 500 ms for the remote broker to process the request and
// notify us about the metrics
Expand Down
8 changes: 4 additions & 4 deletions consumer_group_members_test.go
Expand Up @@ -59,7 +59,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
}

meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2)
err = decode(buf, meta2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
Expand All @@ -69,10 +69,10 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {

func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta); err != nil {
if err := decode(groupMemberMetadataV1, meta, nil); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
if err := decode(groupMemberMetadataV1Bad, meta, nil); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}
Expand All @@ -94,7 +94,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
}

amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2)
err = decode(buf, amt2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
Expand Down
2 changes: 1 addition & 1 deletion consumer_metadata_response_test.go
Expand Up @@ -26,7 +26,7 @@ func TestConsumerMetadataResponseError(t *testing.T) {
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0, nil); err != nil {
t.Error("could not decode: ", err)
}

Expand Down
4 changes: 2 additions & 2 deletions describe_groups_response.go
Expand Up @@ -250,7 +250,7 @@ func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAs
return nil, nil
}
assignment := new(ConsumerGroupMemberAssignment)
err := decode(gmd.MemberAssignment, assignment)
err := decode(gmd.MemberAssignment, assignment, nil)
return assignment, err
}

Expand All @@ -259,6 +259,6 @@ func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMeta
return nil, nil
}
metadata := new(ConsumerGroupMemberMetadata)
err := decode(gmd.MemberMetadata, metadata)
err := decode(gmd.MemberMetadata, metadata, nil)
return metadata, err
}
14 changes: 10 additions & 4 deletions encoder_decoder.go
Expand Up @@ -57,12 +57,15 @@ type versionedDecoder interface {

// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
helper := realDecoder{
raw: buf,
registry: metricRegistry,
}
err := in.decode(&helper)
if err != nil {
return err
Expand All @@ -75,12 +78,15 @@ func decode(buf []byte, in decoder) error {
return nil
}

func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
helper := realDecoder{
raw: buf,
registry: metricRegistry,
}
err := in.decode(&helper, version)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions fetch_request.go
Expand Up @@ -95,6 +95,8 @@ const (
)

func (r *FetchRequest) encode(pe packetEncoder) (err error) {
metricRegistry := pe.metricRegistry()

pe.putInt32(-1) // ReplicaID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
Expand Down Expand Up @@ -128,6 +130,7 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
return err
}
}
getOrRegisterTopicMeter("consumer-fetch-rate", topic, metricRegistry).Mark(1)
}
if r.Version >= 7 {
err = pe.putArrayLength(len(r.forgotten))
Expand Down
11 changes: 11 additions & 0 deletions fetch_response.go
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"sort"
"time"

"github.com/rcrowley/go-metrics"
)

const invalidPreferredReplicaID = -1
Expand Down Expand Up @@ -60,6 +62,12 @@ type FetchResponseBlock struct {
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
metricRegistry := pd.metricRegistry()
var sizeMetric metrics.Histogram
if metricRegistry != nil {
sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
}

tmp, err := pd.getInt16()
if err != nil {
return err
Expand Down Expand Up @@ -115,6 +123,9 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
if sizeMetric != nil {
sizeMetric.Update(int64(recordsSize))
}

recordsDecoder, err := pd.getSubset(int(recordsSize))
if err != nil {
Expand Down
37 changes: 35 additions & 2 deletions functional_producer_test.go
Expand Up @@ -307,7 +307,7 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, producer)

// Validate producer metrics before using the consumer minus the offset request
validateMetrics(t, client)
validateProducerMetrics(t, client)

master, err := NewConsumerFromClient(client)
if err != nil {
Expand All @@ -332,6 +332,9 @@ func testProducingMessages(t *testing.T, config *Config) {
}
}
}

validateConsumerMetrics(t, client)

safeClose(t, consumer)
safeClose(t, client)
}
Expand Down Expand Up @@ -397,7 +400,7 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
closeProducer(t, producer)
}

func validateMetrics(t *testing.T, client Client) {
func validateProducerMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
if partitions, err := client.Partitions("test.1"); err != nil {
Expand Down Expand Up @@ -486,6 +489,36 @@ func validateMetrics(t *testing.T, client Client) {
metricValidators.run(t, client.Config().MetricRegistry)
}

func validateConsumerMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
if partitions, err := client.Partitions("test.1"); err != nil {
t.Error(err)
} else {
for _, partition := range partitions {
if b, err := client.Leader("test.1", partition); err != nil {
t.Error(err)
} else {
if broker != nil && b != broker {
t.Fatal("Expected only one broker, got at least 2")
}
broker = b
}
}
}

metricValidators := newMetricValidators()

// at least 1 global fetch request for the given topic
metricValidators.registerForGlobalAndTopic("test_1", minCountMeterValidator("consumer-fetch-rate", 1))

// and at least 1 fetch request to the lead broker
metricValidators.registerForBroker(broker, minCountMeterValidator("consumer-fetch-rate", 1))

// Run the validators
metricValidators.run(t, client.Config().MetricRegistry)
}

// Benchmarks

func BenchmarkProducerSmall(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion join_group_response.go
Expand Up @@ -21,7 +21,7 @@ func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
for _, member := range r.Members {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(member.Metadata, meta); err != nil {
if err := decode(member.Metadata, meta, nil); err != nil {
return nil, err
}
members[member.MemberId] = *meta
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Expand Up @@ -236,7 +236,7 @@ func TestMessageDecodingVersion1(t *testing.T) {

func TestMessageDecodingUnknownVersions(t *testing.T) {
message := Message{Version: 2}
err := decode(emptyV2Message, &message)
err := decode(emptyV2Message, &message, nil)
if err == nil {
t.Error("Decoding did not produce an error for an unknown magic byte")
}
Expand Down
3 changes: 3 additions & 0 deletions metrics_test.go
Expand Up @@ -68,6 +68,7 @@ func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metr
}

func (m metricValidators) run(t *testing.T, r metrics.Registry) {
t.Helper()
for _, metricValidator := range m {
metric := r.Get(metricValidator.name)
if metric == nil {
Expand All @@ -82,6 +83,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter))
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if meter, ok := metric.(metrics.Meter); !ok {
t.Errorf("Expected meter metric for '%s', got %T", name, metric)
} else {
Expand All @@ -93,6 +95,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter))

func countMeterValidator(name string, expectedCount int) *metricValidator {
return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
t.Helper()
count := meter.Count()
if count != int64(expectedCount) {
t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count)
Expand Down

0 comments on commit 3083a9b

Please sign in to comment.