Skip to content

Commit

Permalink
No globally registered metrics in pkg/querier (#2210)
Browse files Browse the repository at this point in the history
* No globally registered metrics in pkg/querier

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Mar 5, 2020
1 parent 8a8a5c8 commit 442db4b
Show file tree
Hide file tree
Showing 24 changed files with 277 additions and 99 deletions.
2 changes: 1 addition & 1 deletion cmd/test-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/tracing"

"github.com/cortexproject/cortex/pkg/querier/correctness"
"github.com/cortexproject/cortex/pkg/testexporter/correctness"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)
Expand Down
4 changes: 2 additions & 2 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func NewHTTPService(
}
}

func (s *HTTPService) metrics() (_ string, err error) {
func (s *HTTPService) Metrics() (_ string, err error) {
// Map the container port to the local port
localPort := s.networkPortsContainerToLocal[s.httpPort]

Expand Down Expand Up @@ -522,7 +522,7 @@ func (s *HTTPService) WaitSumMetrics(isExpected func(sums ...float64) bool, metr
sums := make([]float64, len(metricNames))

for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); {
metrics, err := s.metrics()
metrics, err := s.Metrics()
if err != nil {
return err
}
Expand Down
112 changes: 112 additions & 0 deletions integration/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// +build integration

package main

import (
"strings"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestExportedMetrics(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, consul))

// Start Cortex components.
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(distributor, queryFrontend, ingester, tableManager))

// Start the querier after the query-frontend otherwise we're not
// able to get the query-frontend network endpoint.
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_dynamo_sync_tables_seconds"))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push some series to Cortex (to hit the write path).
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query the series both from the querier and query-frontend (to hit the read path).
for _, endpoint := range []string{querier.HTTPEndpoint(), queryFrontend.HTTPEndpoint()} {
c, err := e2ecortex.NewClient("", endpoint, "", "user-1")
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// For each Cortex service, ensure its service-specific metrics prefix is not used by metrics
// exported by other services.
services := map[*e2ecortex.CortexService][]string{
distributor: []string{},
ingester: []string{},
querier: []string{},
queryFrontend: []string{"cortex_frontend", "cortex_query_frontend"},
tableManager: []string{},
}

for service, prefixes := range services {
if len(prefixes) == 0 {
continue
}

// Assert the prefixes against all other services.
for target, _ := range services {
if service == target {
continue
}

metrics, err := target.Metrics()
require.NoError(t, err)

// Ensure no metric name matches the "reserved" prefixes.
for _, metricLine := range strings.Split(metrics, "\n") {
metricLine = strings.TrimSpace(metricLine)
if metricLine == "" || strings.HasPrefix(metricLine, "#") {
continue
}

for _, prefix := range prefixes {
assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", target.Name())
}
}
}
}
}
3 changes: 2 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err erro
return
}

t.frontend, err = frontend.New(cfg.Frontend, util.Logger)
t.frontend, err = frontend.New(cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand All @@ -410,6 +410,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err erro
Timeout: cfg.Querier.Timeout,
},
cfg.Querier.QueryIngestersWithin,
prometheus.DefaultRegisterer,
)

if err != nil {
Expand Down
12 changes: 0 additions & 12 deletions pkg/querier/astmapper/instrumentation.go

This file was deleted.

14 changes: 12 additions & 2 deletions pkg/querier/astmapper/shard_summer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
Expand All @@ -29,10 +31,13 @@ type shardSummer struct {
shards int
currentShard *int
squash squasher

// Metrics.
shardedQueries prometheus.Counter
}

// NewShardSummer instantiates an ASTMapper which will fan out sum queries by shard
func NewShardSummer(shards int, squasher squasher) (ASTMapper, error) {
func NewShardSummer(shards int, squasher squasher, registerer prometheus.Registerer) (ASTMapper, error) {
if squasher == nil {
return nil, errors.Errorf("squasher required and not passed")
}
Expand All @@ -41,6 +46,11 @@ func NewShardSummer(shards int, squasher squasher) (ASTMapper, error) {
shards: shards,
squash: squasher,
currentShard: nil,
shardedQueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "frontend_sharded_queries_total",
Help: "Total number of sharded queries",
}),
}), nil
}

Expand Down Expand Up @@ -195,7 +205,7 @@ func (summer *shardSummer) splitSum(
)
}

shardCounter.Add(float64(summer.shards))
summer.shardedQueries.Add(float64(summer.shards))

return parent, children, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/astmapper/shard_summer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestShardSummer(t *testing.T) {
for i, c := range testExpr {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {

summer, err := NewShardSummer(c.shards, orSquasher)
summer, err := NewShardSummer(c.shards, orSquasher, nil)
require.Nil(t, err)
expr, err := promql.ParseExpr(c.input)
require.Nil(t, err)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestShardSummerWithEncoding(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
summer, err := NewShardSummer(c.shards, VectorSquasher)
summer, err := NewShardSummer(c.shards, VectorSquasher, nil)
require.Nil(t, err)
expr, err := promql.ParseExpr(c.input)
require.Nil(t, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -71,7 +72,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
logLevel: logLevel,
bucketStoreMetrics: newTSDBBucketStoreMetrics(),
indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry),
syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{
syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900},
Expand All @@ -93,7 +94,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
}

if registerer != nil {
registerer.MustRegister(u.syncTimes, u.bucketStoreMetrics, u.indexCacheMetrics)
registerer.MustRegister(u.bucketStoreMetrics, u.indexCacheMetrics)
}

u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, u.stopping)
Expand Down
39 changes: 22 additions & 17 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,12 @@ import (
"github.com/weaveworks/common/user"
)

var (
queueDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
})
queueLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
})

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
)

var (
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
Expand Down Expand Up @@ -72,6 +62,10 @@ type Frontend struct {
mtx sync.Mutex
cond *sync.Cond
queues map[string]chan *request

// Metrics.
queueDuration prometheus.Histogram
queueLength prometheus.Gauge
}

type request struct {
Expand All @@ -85,11 +79,22 @@ type request struct {
}

// New creates a new frontend.
func New(cfg Config, log log.Logger) (*Frontend, error) {
func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
queues: map[string]chan *request{},
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
queueLength: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
}),
}
f.cond = sync.NewCond(&f.mtx)

Expand Down Expand Up @@ -331,7 +336,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {

select {
case queue <- req:
queueLength.Add(1)
f.queueLength.Add(1)
f.cond.Broadcast()
return nil
default:
Expand Down Expand Up @@ -385,8 +390,8 @@ FindQueue:
// Tell close() we've processed a request.
f.cond.Broadcast()

queueDuration.Observe(time.Since(request.enqueueTime).Seconds())
queueLength.Add(-1)
f.queueDuration.Observe(time.Since(request.enqueueTime).Seconds())
f.queueLength.Add(-1)
request.queueSpan.Finish()

// Ensure the request has not already expired.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

frontend, err := New(config, logger)
frontend, err := New(config, logger, nil)
require.NoError(t, err)
defer frontend.Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/frontend/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func setupFrontend(t *testing.T, config Config) *Frontend {
logger := log.NewNopLogger()

frontend, err := New(config, logger)
frontend, err := New(config, logger, nil)
require.NoError(t, err)
defer frontend.Close()
return frontend
Expand Down

0 comments on commit 442db4b

Please sign in to comment.