Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No globally registered metrics in pkg/querier #2210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/test-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/tracing"

"github.com/cortexproject/cortex/pkg/querier/correctness"
"github.com/cortexproject/cortex/pkg/testexporter/correctness"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)
Expand Down
4 changes: 2 additions & 2 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func NewHTTPService(
}
}

func (s *HTTPService) metrics() (_ string, err error) {
func (s *HTTPService) Metrics() (_ string, err error) {
// Map the container port to the local port
localPort := s.networkPortsContainerToLocal[s.httpPort]

Expand Down Expand Up @@ -522,7 +522,7 @@ func (s *HTTPService) WaitSumMetrics(isExpected func(sums ...float64) bool, metr
sums := make([]float64, len(metricNames))

for s.retryBackoff.Reset(); s.retryBackoff.Ongoing(); {
metrics, err := s.metrics()
metrics, err := s.Metrics()
if err != nil {
return err
}
Expand Down
112 changes: 112 additions & 0 deletions integration/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// +build integration

package main
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

import (
"strings"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestExportedMetrics(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, consul))

// Start Cortex components.
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(distributor, queryFrontend, ingester, tableManager))

// Start the querier after the query-frontend otherwise we're not
// able to get the query-frontend network endpoint.
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_dynamo_sync_tables_seconds"))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push some series to Cortex (to hit the write path).
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query the series both from the querier and query-frontend (to hit the read path).
for _, endpoint := range []string{querier.HTTPEndpoint(), queryFrontend.HTTPEndpoint()} {
c, err := e2ecortex.NewClient("", endpoint, "", "user-1")
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// For each Cortex service, ensure its service-specific metrics prefix is not used by metrics
// exported by other services.
services := map[*e2ecortex.CortexService][]string{
distributor: []string{},
ingester: []string{},
querier: []string{},
queryFrontend: []string{"cortex_frontend", "cortex_query_frontend"},
tableManager: []string{},
}

for service, prefixes := range services {
if len(prefixes) == 0 {
continue
}

// Assert the prefixes against all other services.
for target, _ := range services {
if service == target {
continue
}

metrics, err := target.Metrics()
require.NoError(t, err)

// Ensure no metric name matches the "reserved" prefixes.
for _, metricLine := range strings.Split(metrics, "\n") {
metricLine = strings.TrimSpace(metricLine)
if metricLine == "" || strings.HasPrefix(metricLine, "#") {
continue
}

for _, prefix := range prefixes {
assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", target.Name())
}
}
}
}
}
3 changes: 2 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err erro
return
}

t.frontend, err = frontend.New(cfg.Frontend, util.Logger)
t.frontend, err = frontend.New(cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand All @@ -410,6 +410,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err erro
Timeout: cfg.Querier.Timeout,
},
cfg.Querier.QueryIngestersWithin,
prometheus.DefaultRegisterer,
)

if err != nil {
Expand Down
12 changes: 0 additions & 12 deletions pkg/querier/astmapper/instrumentation.go

This file was deleted.

14 changes: 12 additions & 2 deletions pkg/querier/astmapper/shard_summer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
Expand All @@ -29,10 +31,13 @@ type shardSummer struct {
shards int
currentShard *int
squash squasher

// Metrics.
shardedQueries prometheus.Counter
}

// NewShardSummer instantiates an ASTMapper which will fan out sum queries by shard
func NewShardSummer(shards int, squasher squasher) (ASTMapper, error) {
func NewShardSummer(shards int, squasher squasher, registerer prometheus.Registerer) (ASTMapper, error) {
if squasher == nil {
return nil, errors.Errorf("squasher required and not passed")
}
Expand All @@ -41,6 +46,11 @@ func NewShardSummer(shards int, squasher squasher) (ASTMapper, error) {
shards: shards,
squash: squasher,
currentShard: nil,
shardedQueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "frontend_sharded_queries_total",
Help: "Total number of sharded queries",
}),
}), nil
}

Expand Down Expand Up @@ -195,7 +205,7 @@ func (summer *shardSummer) splitSum(
)
}

shardCounter.Add(float64(summer.shards))
summer.shardedQueries.Add(float64(summer.shards))

return parent, children, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/astmapper/shard_summer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestShardSummer(t *testing.T) {
for i, c := range testExpr {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {

summer, err := NewShardSummer(c.shards, orSquasher)
summer, err := NewShardSummer(c.shards, orSquasher, nil)
require.Nil(t, err)
expr, err := promql.ParseExpr(c.input)
require.Nil(t, err)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestShardSummerWithEncoding(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
summer, err := NewShardSummer(c.shards, VectorSquasher)
summer, err := NewShardSummer(c.shards, VectorSquasher, nil)
require.Nil(t, err)
expr, err := promql.ParseExpr(c.input)
require.Nil(t, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -71,7 +72,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
logLevel: logLevel,
bucketStoreMetrics: newTSDBBucketStoreMetrics(),
indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry),
syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{
syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900},
Expand All @@ -93,7 +94,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
}

if registerer != nil {
registerer.MustRegister(u.syncTimes, u.bucketStoreMetrics, u.indexCacheMetrics)
registerer.MustRegister(u.bucketStoreMetrics, u.indexCacheMetrics)
}

u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, u.stopping)
Expand Down
39 changes: 22 additions & 17 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,12 @@ import (
"github.com/weaveworks/common/user"
)

var (
queueDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
})
queueLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
})

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
)

var (
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
Expand Down Expand Up @@ -72,6 +62,10 @@ type Frontend struct {
mtx sync.Mutex
cond *sync.Cond
queues map[string]chan *request

// Metrics.
queueDuration prometheus.Histogram
queueLength prometheus.Gauge
}

type request struct {
Expand All @@ -85,11 +79,22 @@ type request struct {
}

// New creates a new frontend.
func New(cfg Config, log log.Logger) (*Frontend, error) {
func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
queues: map[string]chan *request{},
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
queueLength: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "query_frontend_queue_length",
Help: "Number of queries in the queue.",
}),
}
f.cond = sync.NewCond(&f.mtx)

Expand Down Expand Up @@ -331,7 +336,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {

select {
case queue <- req:
queueLength.Add(1)
f.queueLength.Add(1)
f.cond.Broadcast()
return nil
default:
Expand Down Expand Up @@ -385,8 +390,8 @@ FindQueue:
// Tell close() we've processed a request.
f.cond.Broadcast()

queueDuration.Observe(time.Since(request.enqueueTime).Seconds())
queueLength.Add(-1)
f.queueDuration.Observe(time.Since(request.enqueueTime).Seconds())
f.queueLength.Add(-1)
request.queueSpan.Finish()

// Ensure the request has not already expired.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

frontend, err := New(config, logger)
frontend, err := New(config, logger, nil)
require.NoError(t, err)
defer frontend.Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/frontend/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func setupFrontend(t *testing.T, config Config) *Frontend {
logger := log.NewNopLogger()

frontend, err := New(config, logger)
frontend, err := New(config, logger, nil)
require.NoError(t, err)
defer frontend.Close()
return frontend
Expand Down