diff --git a/balance_strategy.go b/balance_strategy.go index d6b47b4d0..4594df6f6 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -609,9 +609,9 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti // Deserialize topic partition assignment data to aid with creation of a sticky assignment. func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) { userDataV1 := &StickyAssignorUserDataV1{} - if err := decode(userDataBytes, userDataV1); err != nil { + if err := decode(userDataBytes, userDataV1, nil); err != nil { userDataV0 := &StickyAssignorUserDataV0{} - if err := decode(userDataBytes, userDataV0); err != nil { + if err := decode(userDataBytes, userDataV0, nil); err != nil { return nil, err } return userDataV0, nil diff --git a/broker.go b/broker.go index bbcbd1e04..d857c7048 100644 --- a/broker.go +++ b/broker.go @@ -37,6 +37,7 @@ type Broker struct { incomingByteRate metrics.Meter requestRate metrics.Meter + fetchRate metrics.Meter requestSize metrics.Histogram requestLatency metrics.Histogram outgoingByteRate metrics.Meter @@ -45,6 +46,7 @@ type Broker struct { requestsInFlight metrics.Counter brokerIncomingByteRate metrics.Meter brokerRequestRate metrics.Meter + brokerFetchRate metrics.Meter brokerRequestSize metrics.Histogram brokerRequestLatency metrics.Histogram brokerOutgoingByteRate metrics.Meter @@ -208,6 +210,7 @@ func (b *Broker) Open(conf *Config) error { // Create or reuse the global metrics shared between brokers b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) + b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", conf.MetricRegistry) b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry) b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) @@ -432,7 +435,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error return } - if err := versionedDecode(packets, res, request.version()); err != nil { + if err := versionedDecode(packets, res, request.version(), b.conf.MetricRegistry); err != nil { // Malformed response cb(nil, err) return @@ -472,6 +475,15 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { // Fetch returns a FetchResponse or error func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { + defer func() { + if b.fetchRate != nil { + b.fetchRate.Mark(1) + } + if b.brokerFetchRate != nil { + b.brokerFetchRate.Mark(1) + } + }() + response := new(FetchResponse) err := b.sendAndReceive(request, response) @@ -1011,13 +1023,13 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { return nil } - return handleResponsePromise(req, res, promise) + return b.handleResponsePromise(req, res, promise) } -func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error { +func (b *Broker) handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error { select { case buf := <-promise.packets: - return versionedDecode(buf, res, req.version()) + return versionedDecode(buf, res, req.version(), b.conf.MetricRegistry) case err := <-promise.errors: return err } @@ -1109,7 +1121,7 @@ func (b *Broker) responseReceiver() { } decodedHeader := responseHeader{} - err = versionedDecode(header, &decodedHeader, response.headerVersion) + err = versionedDecode(header, &decodedHeader, response.headerVersion, b.conf.MetricRegistry) if err != nil { b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) dead = err @@ -1170,7 +1182,7 @@ func (b *Broker) authenticateViaSASLv1() error { Logger.Printf("Error while performing SASL handshake %s\n", b.addr) return handshakeErr } - handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom) + handshakeErr = b.handleResponsePromise(handshakeRequest, handshakeResponse, prom) if handshakeErr != nil { Logger.Printf("Error while performing SASL handshake %s\n", b.addr) return handshakeErr @@ -1190,7 +1202,7 @@ func (b *Broker) authenticateViaSASLv1() error { Logger.Printf("Error while performing SASL Auth %s\n", b.addr) return nil, authErr } - authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom) + authErr = b.handleResponsePromise(authenticateRequest, authenticateResponse, prom) if authErr != nil { Logger.Printf("Error while performing SASL Auth %s\n", b.addr) return nil, authErr @@ -1268,7 +1280,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime)) res := &SaslHandshakeResponse{} - err = versionedDecode(payload, res, 0) + err = versionedDecode(payload, res, 0, b.conf.MetricRegistry) if err != nil { Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error()) return err @@ -1600,6 +1612,7 @@ func (b *Broker) updateThrottleMetric(throttleTime time.Duration) { func (b *Broker) registerMetrics() { b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate") b.brokerRequestRate = b.registerMeter("request-rate") + b.brokerFetchRate = b.registerMeter("consumer-fetch-rate") b.brokerRequestSize = b.registerHistogram("request-size") b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms") b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate") diff --git a/broker_test.go b/broker_test.go index 9310d8b90..52a4e4bae 100644 --- a/broker_test.go +++ b/broker_test.go @@ -123,7 +123,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { pendingNotify <- brokerMetrics{bytesRead, bytesWritten} }) broker := NewBroker(mb.Addr()) - // Set the broker id in order to validate local broker metrics + // Set the broker id in order to validate local broujhjker metrics broker.id = 0 conf := NewTestConfig() conf.ApiVersionsRequest = false @@ -132,6 +132,9 @@ func TestSimpleBrokerCommunication(t *testing.T) { if err != nil { t.Fatal(err) } + if _, err := broker.Connected(); err != nil { + t.Error(err) + } tt.runner(t, broker) // Wait up to 500 ms for the remote broker to process the request and // notify us about the metrics diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index a99de61c6..8c1c2d56d 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -59,7 +59,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { } meta2 := new(ConsumerGroupMemberMetadata) - err = decode(buf, meta2) + err = decode(buf, meta2, nil) if err != nil { t.Error("Failed to decode data", err) } else if !reflect.DeepEqual(meta, meta2) { @@ -69,10 +69,10 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { meta := new(ConsumerGroupMemberMetadata) - if err := decode(groupMemberMetadataV1, meta); err != nil { + if err := decode(groupMemberMetadataV1, meta, nil); err != nil { t.Error("Failed to decode V1 data", err) } - if err := decode(groupMemberMetadataV1Bad, meta); err != nil { + if err := decode(groupMemberMetadataV1Bad, meta, nil); err != nil { t.Error("Failed to decode V1 'bad' data", err) } } @@ -94,7 +94,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) { } amt2 := new(ConsumerGroupMemberAssignment) - err = decode(buf, amt2) + err = decode(buf, amt2, nil) if err != nil { t.Error("Failed to decode data", err) } else if !reflect.DeepEqual(amt, amt2) { diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index e97707ddc..cdd489ec1 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -26,7 +26,7 @@ func TestConsumerMetadataResponseError(t *testing.T) { testEncodable(t, "", response, consumerMetadataResponseError) decodedResp := &ConsumerMetadataResponse{} - if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil { + if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0, nil); err != nil { t.Error("could not decode: ", err) } diff --git a/describe_groups_response.go b/describe_groups_response.go index a449d0b0e..09052e431 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -250,7 +250,7 @@ func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAs return nil, nil } assignment := new(ConsumerGroupMemberAssignment) - err := decode(gmd.MemberAssignment, assignment) + err := decode(gmd.MemberAssignment, assignment, nil) return assignment, err } @@ -259,6 +259,6 @@ func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMeta return nil, nil } metadata := new(ConsumerGroupMemberMetadata) - err := decode(gmd.MemberMetadata, metadata) + err := decode(gmd.MemberMetadata, metadata, nil) return metadata, err } diff --git a/encoder_decoder.go b/encoder_decoder.go index 4aab4d21d..4ee76c6d7 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -57,12 +57,15 @@ type versionedDecoder interface { // decode takes bytes and a decoder and fills the fields of the decoder from the bytes, // interpreted using Kafka's encoding rules. -func decode(buf []byte, in decoder) error { +func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error { if buf == nil { return nil } - helper := realDecoder{raw: buf} + helper := realDecoder{ + raw: buf, + registry: metricRegistry, + } err := in.decode(&helper) if err != nil { return err @@ -75,12 +78,15 @@ func decode(buf []byte, in decoder) error { return nil } -func versionedDecode(buf []byte, in versionedDecoder, version int16) error { +func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error { if buf == nil { return nil } - helper := realDecoder{raw: buf} + helper := realDecoder{ + raw: buf, + registry: metricRegistry, + } err := in.decode(&helper, version) if err != nil { return err diff --git a/fetch_request.go b/fetch_request.go index 4da5a1d2d..63190a2b0 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -95,6 +95,8 @@ const ( ) func (r *FetchRequest) encode(pe packetEncoder) (err error) { + metricRegistry := pe.metricRegistry() + pe.putInt32(-1) // ReplicaID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) @@ -128,6 +130,7 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { return err } } + getOrRegisterTopicMeter("consumer-fetch-rate", topic, metricRegistry).Mark(1) } if r.Version >= 7 { err = pe.putArrayLength(len(r.forgotten)) diff --git a/fetch_response.go b/fetch_response.go index fc2c1c714..c8ad6046a 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -4,6 +4,8 @@ import ( "errors" "sort" "time" + + "github.com/rcrowley/go-metrics" ) const invalidPreferredReplicaID = -1 @@ -60,6 +62,12 @@ type FetchResponseBlock struct { } func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { + metricRegistry := pd.metricRegistry() + var sizeMetric metrics.Histogram + if metricRegistry != nil { + sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry) + } + tmp, err := pd.getInt16() if err != nil { return err @@ -115,6 +123,9 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) if err != nil { return err } + if sizeMetric != nil { + sizeMetric.Update(int64(recordsSize)) + } recordsDecoder, err := pd.getSubset(int(recordsSize)) if err != nil { diff --git a/functional_producer_test.go b/functional_producer_test.go index eec20ff36..1332a423c 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -307,7 +307,7 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, producer) // Validate producer metrics before using the consumer minus the offset request - validateMetrics(t, client) + validateProducerMetrics(t, client) master, err := NewConsumerFromClient(client) if err != nil { @@ -332,6 +332,9 @@ func testProducingMessages(t *testing.T, config *Config) { } } } + + validateConsumerMetrics(t, client) + safeClose(t, consumer) safeClose(t, client) } @@ -397,7 +400,7 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { closeProducer(t, producer) } -func validateMetrics(t *testing.T, client Client) { +func validateProducerMetrics(t *testing.T, client Client) { // Get the broker used by test1 topic var broker *Broker if partitions, err := client.Partitions("test.1"); err != nil { @@ -486,6 +489,36 @@ func validateMetrics(t *testing.T, client Client) { metricValidators.run(t, client.Config().MetricRegistry) } +func validateConsumerMetrics(t *testing.T, client Client) { + // Get the broker used by test1 topic + var broker *Broker + if partitions, err := client.Partitions("test.1"); err != nil { + t.Error(err) + } else { + for _, partition := range partitions { + if b, err := client.Leader("test.1", partition); err != nil { + t.Error(err) + } else { + if broker != nil && b != broker { + t.Fatal("Expected only one broker, got at least 2") + } + broker = b + } + } + } + + metricValidators := newMetricValidators() + + // at least 1 global fetch request for the given topic + metricValidators.registerForGlobalAndTopic("test_1", minCountMeterValidator("consumer-fetch-rate", 1)) + + // and at least 1 fetch request to the lead broker + metricValidators.registerForBroker(broker, minCountMeterValidator("consumer-fetch-rate", 1)) + + // Run the validators + metricValidators.run(t, client.Config().MetricRegistry) +} + // Benchmarks func BenchmarkProducerSmall(b *testing.B) { diff --git a/join_group_response.go b/join_group_response.go index 16342a3a4..d8aa1f002 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -21,7 +21,7 @@ func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members)) for _, member := range r.Members { meta := new(ConsumerGroupMemberMetadata) - if err := decode(member.Metadata, meta); err != nil { + if err := decode(member.Metadata, meta, nil); err != nil { return nil, err } members[member.MemberId] = *meta diff --git a/message_test.go b/message_test.go index d7bd430d3..e2148babf 100644 --- a/message_test.go +++ b/message_test.go @@ -236,7 +236,7 @@ func TestMessageDecodingVersion1(t *testing.T) { func TestMessageDecodingUnknownVersions(t *testing.T) { message := Message{Version: 2} - err := decode(emptyV2Message, &message) + err := decode(emptyV2Message, &message, nil) if err == nil { t.Error("Decoding did not produce an error for an unknown magic byte") } diff --git a/metrics_test.go b/metrics_test.go index 7572f5b90..ee7aea374 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -68,6 +68,7 @@ func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metr } func (m metricValidators) run(t *testing.T, r metrics.Registry) { + t.Helper() for _, metricValidator := range m { metric := r.Get(metricValidator.name) if metric == nil { @@ -82,6 +83,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if meter, ok := metric.(metrics.Meter); !ok { t.Errorf("Expected meter metric for '%s', got %T", name, metric) } else { @@ -93,6 +95,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) func countMeterValidator(name string, expectedCount int) *metricValidator { return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() count := meter.Count() if count != int64(expectedCount) { t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count) diff --git a/packet_decoder.go b/packet_decoder.go index 08b433223..b8cae5350 100644 --- a/packet_decoder.go +++ b/packet_decoder.go @@ -1,5 +1,7 @@ package sarama +import "github.com/rcrowley/go-metrics" + // PacketDecoder is the interface providing helpers for reading with Kafka's encoding rules. // Types implementing Decoder only need to worry about calling methods like GetString, // not about how a string is represented in Kafka. @@ -40,6 +42,9 @@ type packetDecoder interface { // Stacks, see PushDecoder push(in pushDecoder) error pop() error + + // To record metrics when provided + metricRegistry() metrics.Registry } // PushDecoder is the interface for decoding fields like CRCs and lengths where the validity diff --git a/real_decoder.go b/real_decoder.go index a5b54ebb3..7e37641f9 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -3,6 +3,8 @@ package sarama import ( "encoding/binary" "math" + + "github.com/rcrowley/go-metrics" ) var ( @@ -15,9 +17,10 @@ var ( ) type realDecoder struct { - raw []byte - off int - stack []pushDecoder + raw []byte + off int + stack []pushDecoder + registry metrics.Registry } // primitives @@ -459,3 +462,7 @@ func (rd *realDecoder) pop() error { return in.check(rd.off, rd.raw) } + +func (rd *realDecoder) metricRegistry() metrics.Registry { + return rd.registry +} diff --git a/record_batch.go b/record_batch.go index d7ef2616c..d382ca488 100644 --- a/record_batch.go +++ b/record_batch.go @@ -186,7 +186,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { } b.recordsLen = len(recBuffer) - err = decode(recBuffer, recordsArray(b.Records)) + err = decode(recBuffer, recordsArray(b.Records), nil) if errors.Is(err, ErrInsufficientData) { b.PartialTrailingRecord = true b.Records = nil diff --git a/records_test.go b/records_test.go index 34f1b4a6e..621e4b32d 100644 --- a/records_test.go +++ b/records_test.go @@ -33,11 +33,11 @@ func TestLegacyRecords(t *testing.T) { set = &MessageSet{} r = Records{} - err = decode(exp, set) + err = decode(exp, set, nil) if err != nil { t.Fatal(err) } - err = decode(buf, &r) + err = decode(buf, &r, nil) if err != nil { t.Fatal(err) } @@ -110,11 +110,11 @@ func TestDefaultRecords(t *testing.T) { batch = &RecordBatch{} r = Records{} - err = decode(exp, batch) + err = decode(exp, batch, nil) if err != nil { t.Fatal(err) } - err = decode(buf, &r) + err = decode(buf, &r, nil) if err != nil { t.Fatal(err) } diff --git a/request.go b/request.go index ce90eb8c4..a2c92f8a9 100644 --- a/request.go +++ b/request.go @@ -109,7 +109,7 @@ func decodeRequest(r io.Reader) (*request, int, error) { bytesRead += len(encodedReq) req := &request{} - if err := decode(encodedReq, req); err != nil { + if err := decode(encodedReq, req, nil); err != nil { return nil, bytesRead, err } diff --git a/request_test.go b/request_test.go index b04dbe264..193806994 100644 --- a/request_test.go +++ b/request_test.go @@ -21,14 +21,14 @@ func testEncodable(t *testing.T, name string, in encoder, expect []byte) { } func testDecodable(t *testing.T, name string, out decoder, in []byte) { - err := decode(in, out) + err := decode(in, out, nil) if err != nil { t.Error("Decoding", name, "failed:", err) } } func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []byte, version int16) { - err := versionedDecode(in, out, version) + err := versionedDecode(in, out, version, nil) if err != nil { t.Error("Decoding", name, "version", version, "failed:", err) } @@ -99,7 +99,7 @@ func testResponse(t *testing.T, name string, res protocolBody, expected []byte) } decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(versionedDecoder) - if err := versionedDecode(encoded, decoded, res.version()); err != nil { + if err := versionedDecode(encoded, decoded, res.version(), nil); err != nil { t.Error("Decoding", name, "failed:", err) } diff --git a/sarama.go b/sarama.go index 862ec0830..cb773f171 100644 --- a/sarama.go +++ b/sarama.go @@ -68,6 +68,10 @@ Consumer related metrics: | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + | consumer-fetch-rate | meter | Fetch requests/second sent to all brokers | + | consumer-fetch-rate-for-broker- | meter | Fetch requests/second sent to a given broker | + | consumer-fetch-rate-for-topic- | meter | Fetch requests/second sent for a given topic | + | consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes | | consumer-group-join-total- | counter | Total count of consumer group join attempts | | consumer-group-join-failed- | counter | Total count of consumer group join failures | | consumer-group-sync-total- | counter | Total count of consumer group sync attempts | diff --git a/sync_group_response.go b/sync_group_response.go index 80124e171..41b63b3d0 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -15,7 +15,7 @@ type SyncGroupResponse struct { func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { assignment := new(ConsumerGroupMemberAssignment) - err := decode(r.MemberAssignment, assignment) + err := decode(r.MemberAssignment, assignment, nil) return assignment, err }