diff --git a/core/internal/httpserver/coordinator.go b/core/internal/httpserver/coordinator.go index 3c36965b..68e7ae18 100644 --- a/core/internal/httpserver/coordinator.go +++ b/core/internal/httpserver/coordinator.go @@ -129,6 +129,8 @@ func (hc *Coordinator) Configure() { // This is a healthcheck URL. Please don't change it hc.router.GET("/burrow/admin", hc.handleAdmin) + hc.router.Handler(http.MethodGet, "/metrics", hc.handlePrometheusMetrics()) + // All valid paths go here hc.router.GET("/v3/kafka", hc.handleClusterList) hc.router.GET("/v3/kafka/:cluster", hc.handleClusterDetail) diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go new file mode 100644 index 00000000..92456f50 --- /dev/null +++ b/core/internal/httpserver/prometheus.go @@ -0,0 +1,182 @@ +package httpserver + +import ( + "net/http" + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/linkedin/Burrow/core/protocol" + + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + consumerTotalLagGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "burrow_kafka_consumer_lag_total", + Help: "The sum of all partition current lag values for the group", + }, + []string{"cluster", "consumer_group"}, + ) + + consumerStatusGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "burrow_kafka_consumer_status", + Help: "The status of the consumer group. It is calculated from the highest status for the individual partitions. Statuses are an index list from NOTFOUND, OK, WARN, ERR, STOP, STALL, REWIND", + }, + []string{"cluster", "consumer_group"}, + ) + + consumerPartitionCurrentOffset = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "burrow_kafka_consumer_current_offset", + Help: "Latest offset that Burrow is storing for this partition", + }, + []string{"cluster", "consumer_group", "topic", "partition"}, + ) + + consumerPartitionLagGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "burrow_kafka_consumer_partition_lag", + Help: "Number of messages the consumer group is behind by for a partition as reported by Burrow", + }, + []string{"cluster", "consumer_group", "topic", "partition"}, + ) + + topicPartitionOffsetGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "burrow_kafka_topic_partition_offset", + Help: "Latest offset the topic that Burrow is storing for this partition", + }, + []string{"cluster", "topic", "partition"}, + ) +) + +func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc { + promHandler := promhttp.Handler() + + return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + for _, cluster := range listClusters(hc.App) { + for _, consumer := range listConsumers(hc.App, cluster) { + consumerStatus := getFullConsumerStatus(hc.App, cluster, consumer) + + if consumerStatus == nil || + consumerStatus.Status == protocol.StatusNotFound || + consumerStatus.Complete < 1.0 { + continue + } + + labels := map[string]string{ + "cluster": cluster, + "consumer_group": consumer, + } + + consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag)) + consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status)) + + for _, partition := range consumerStatus.Partitions { + if partition.Complete < 1.0 { + continue + } + + labels := map[string]string{ + "cluster": cluster, + "consumer_group": consumer, + "topic": partition.Topic, + "partition": strconv.FormatInt(int64(partition.Partition), 10), + } + + consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset)) + consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag)) + } + } + + // Topics + for _, topic := range listTopics(hc.App, cluster) { + for partitionNumber, offset := range getTopicDetail(hc.App, cluster, topic) { + topicPartitionOffsetGauge.With(map[string]string{ + "cluster": cluster, + "topic": topic, + "partition": strconv.FormatInt(int64(partitionNumber), 10), + }).Set(float64(offset)) + } + } + } + + promHandler.ServeHTTP(resp, req) + }) +} + +func listClusters(app *protocol.ApplicationContext) []string { + request := &protocol.StorageRequest{ + RequestType: protocol.StorageFetchClusters, + Reply: make(chan interface{}), + } + app.StorageChannel <- request + response := <-request.Reply + if response == nil { + return []string{} + } + + return response.([]string) +} + +func listConsumers(app *protocol.ApplicationContext, cluster string) []string { + request := &protocol.StorageRequest{ + RequestType: protocol.StorageFetchConsumers, + Cluster: cluster, + Reply: make(chan interface{}), + } + app.StorageChannel <- request + response := <-request.Reply + if response == nil { + return []string{} + } + + return response.([]string) +} + +func getFullConsumerStatus(app *protocol.ApplicationContext, cluster, consumer string) *protocol.ConsumerGroupStatus { + request := &protocol.EvaluatorRequest{ + Cluster: cluster, + Group: consumer, + ShowAll: true, + Reply: make(chan *protocol.ConsumerGroupStatus), + } + app.EvaluatorChannel <- request + response := <-request.Reply + return response +} + +func listTopics(app *protocol.ApplicationContext, cluster string) []string { + request := &protocol.StorageRequest{ + RequestType: protocol.StorageFetchTopics, + Cluster: cluster, + Reply: make(chan interface{}), + } + app.StorageChannel <- request + response := <-request.Reply + if response == nil { + return []string{} + } + + return response.([]string) +} + +func getTopicDetail(app *protocol.ApplicationContext, cluster, topic string) []int64 { + request := &protocol.StorageRequest{ + RequestType: protocol.StorageFetchTopic, + Cluster: cluster, + Topic: topic, + Reply: make(chan interface{}), + } + app.StorageChannel <- request + response := <-request.Reply + if response == nil { + return []int64{} + } + + return response.([]int64) +} diff --git a/core/internal/httpserver/prometheus_test.go b/core/internal/httpserver/prometheus_test.go new file mode 100644 index 00000000..2b2eed03 --- /dev/null +++ b/core/internal/httpserver/prometheus_test.go @@ -0,0 +1,156 @@ +package httpserver + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/linkedin/Burrow/core/protocol" +) + +func TestHttpServer_handlePrometheusMetrics(t *testing.T) { + coordinator := fixtureConfiguredCoordinator() + + // Respond to the expected storage requests + go func() { + request := <-coordinator.App.StorageChannel + assert.Equalf(t, protocol.StorageFetchClusters, request.RequestType, "Expected request of type StorageFetchClusters, not %v", request.RequestType) + request.Reply <- []string{"testcluster"} + close(request.Reply) + + // List of consumers + request = <-coordinator.App.StorageChannel + assert.Equalf(t, protocol.StorageFetchConsumers, request.RequestType, "Expected request of type StorageFetchConsumers, not %v", request.RequestType) + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + request.Reply <- []string{"testgroup", "testgroup2"} + close(request.Reply) + + // List of topics + request = <-coordinator.App.StorageChannel + assert.Equalf(t, protocol.StorageFetchTopics, request.RequestType, "Expected request of type StorageFetchTopics, not %v", request.RequestType) + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + request.Reply <- []string{"testtopic", "testtopic1"} + close(request.Reply) + + // Topic details + request = <-coordinator.App.StorageChannel + assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType) + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + assert.Equalf(t, "testtopic", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic) + request.Reply <- []int64{6556, 5566} + close(request.Reply) + + request = <-coordinator.App.StorageChannel + assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType) + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + assert.Equalf(t, "testtopic1", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic) + request.Reply <- []int64{54} + close(request.Reply) + }() + + // Respond to the expected evaluator requests + go func() { + // testgroup happy paths + request := <-coordinator.App.EvaluatorChannel + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + assert.Equalf(t, "testgroup", request.Group, "Expected request Group to be testgroup, not %v", request.Group) + assert.True(t, request.ShowAll, "Expected request ShowAll to be True") + response := &protocol.ConsumerGroupStatus{ + Cluster: request.Cluster, + Group: request.Group, + Status: protocol.StatusOK, + Complete: 1.0, + Partitions: []*protocol.PartitionStatus{ + { + Topic: "testtopic", + Partition: 0, + Status: protocol.StatusOK, + CurrentLag: 100, + Complete: 1.0, + End: &protocol.ConsumerOffset{ + Offset: 22663, + }, + }, + { + Topic: "testtopic", + Partition: 1, + Status: protocol.StatusOK, + CurrentLag: 10, + Complete: 1.0, + End: &protocol.ConsumerOffset{ + Offset: 2488, + }, + }, + { + Topic: "testtopic1", + Partition: 0, + Status: protocol.StatusOK, + CurrentLag: 50, + Complete: 1.0, + End: &protocol.ConsumerOffset{ + Offset: 99888, + }, + }, + { + Topic: "incomplete", + Partition: 0, + Status: protocol.StatusOK, + CurrentLag: 0, + Complete: 0.2, + End: &protocol.ConsumerOffset{ + Offset: 5335, + }, + }, + }, + TotalPartitions: 2134, + Maxlag: &protocol.PartitionStatus{}, + TotalLag: 2345, + } + request.Reply <- response + close(request.Reply) + + // testgroup2 not found + request = <-coordinator.App.EvaluatorChannel + assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster) + assert.Equalf(t, "testgroup2", request.Group, "Expected request Group to be testgroup, not %v", request.Group) + assert.True(t, request.ShowAll, "Expected request ShowAll to be True") + response = &protocol.ConsumerGroupStatus{ + Cluster: request.Cluster, + Group: request.Group, + Status: protocol.StatusNotFound, + } + request.Reply <- response + close(request.Reply) + }() + + // Set up a request + req, err := http.NewRequest("GET", "/metrics", nil) + assert.NoError(t, err, "Expected request setup to return no error") + + // Call the handler via httprouter + rr := httptest.NewRecorder() + coordinator.router.ServeHTTP(rr, req) + + assert.Equalf(t, http.StatusOK, rr.Code, "Expected response code to be 200, not %v", rr.Code) + + promExp := rr.Body.String() + assert.Contains(t, promExp, `burrow_kafka_consumer_status{cluster="testcluster",consumer_group="testgroup"} 1`) + assert.Contains(t, promExp, `burrow_kafka_consumer_lag_total{cluster="testcluster",consumer_group="testgroup"} 2345`) + + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 100`) + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 10`) + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 50`) + + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`) + + assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`) + assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="1",topic="testtopic"} 5566`) + assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic1"} 54`) + + assert.NotContains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 0`) + assert.NotContains(t, promExp, "testgroup2") +} diff --git a/go.mod b/go.mod index 4e74f402..457a9d1b 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/pelletier/go-toml v1.7.0 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v0.9.3 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da github.com/smartystreets/assertions v1.1.0 // indirect diff --git a/go.sum b/go.sum index b9c2bb0b..63eeb870 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,7 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -55,6 +56,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -111,6 +113,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -141,12 +144,16 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=