Skip to content

Commit

Permalink
Switch pkg/ingester to promauto.With(reg) (#2299)
Browse files Browse the repository at this point in the history
* Switch pkg/ingester to promauto.With(reg)

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Improved integration tests

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Restored previous behaviour when tracking cortex_ingester_sent_bytes_total metric

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Changed newIngesterMetrics() argument name

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Mar 23, 2020
1 parent 0cca64d commit fab8e34
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 208 deletions.
7 changes: 6 additions & 1 deletion integration/asserts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
// Service-specific metrics prefixes which shouldn't be used by any other service.
serviceMetricsPrefixes = map[ServiceType][]string{
Distributor: []string{},
Ingester: []string{},
Ingester: []string{"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
Querier: []string{},
QueryFrontend: []string{"cortex_frontend", "cortex_query_frontend"},
TableManager: []string{},
Expand Down Expand Up @@ -57,6 +57,11 @@ func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service
}

for _, prefix := range blacklist {
// Skip the metric if it matches an ignored prefix.
if prefix[0] == '!' && strings.HasPrefix(metricLine, prefix[1:]) {
break
}

assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", service.Name())
}
}
Expand Down
3 changes: 2 additions & 1 deletion integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
bucketName = "cortex"
cortexConfigFile = "config.yaml"
cortexSchemaConfigFile = "schema.yaml"
blocksStorageEngine = "tsdb"
storeConfigTemplate = `
- from: {{.From}}
store: {{.IndexStore}}
Expand Down Expand Up @@ -61,7 +62,7 @@ var (
}

BlocksStorageFlags = map[string]string{
"-store.engine": "tsdb",
"-store.engine": blocksStorageEngine,
"-experimental.tsdb.backend": "s3",
"-experimental.tsdb.block-ranges-period": "1m",
"-experimental.tsdb.bucket-store.sync-interval": "5s",
Expand Down
5 changes: 4 additions & 1 deletion integration/ingester_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
ingester := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
"-ingester.max-transfer-retries": "0",
}), "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
Expand Down Expand Up @@ -62,6 +62,9 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
require.Equal(t, 200, res.StatusCode)
}

// Ensure ingester metrics are tracked correctly.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_chunks_created_total"))

// Query the series.
result, err := c.Query("series_1", now)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions integration/ingester_hand_over_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

// Ensure 1st ingester metrics are tracked correctly.
if flags["-store.engine"] != blocksStorageEngine {
require.NoError(t, ingester1.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_chunks_created_total"))
}

// Start ingester-2.
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ingester.join-after": "10s",
Expand Down
75 changes: 12 additions & 63 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/go-kit/kit/log/level"
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -23,55 +21,6 @@ const (
flushBackoff = 1 * time.Second
)

var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
chunkLength = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_length",
Help: "Distribution of stored chunk lengths (when stored).",
Buckets: prometheus.ExponentialBuckets(5, 2, 11), // biggest bucket is 5*2^(11-1) = 5120
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000
})
chunksPerUser = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunks_stored_total",
Help: "Total stored chunks per user.",
}, []string{"user"})
chunkSizePerUser = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per user.",
}, []string{"user"})
chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
flushReasons = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_flush_reasons",
Help: "Total number of series scheduled for flushing, with reasons.",
}, []string{"reason"})
droppedChunks = promauto.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_dropped_chunks_total",
Help: "Total number of chunks dropped from flushing because they have too few samples.",
})
oldestUnflushedChunkTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_oldest_unflushed_chunk_timestamp_seconds",
Help: "Unix timestamp of the oldest unflushed chunk in the memory",
})
)

// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
Expand Down Expand Up @@ -133,7 +82,7 @@ func (i *Ingester) sweepUsers(immediate bool) {
}
}

oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
}

type flushReason int8
Expand Down Expand Up @@ -186,7 +135,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo

flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) {
flushReasons.WithLabelValues(flush.String()).Inc()
i.metrics.flushReasons.WithLabelValues(flush.String()).Inc()
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "nlabels", len(series.metric), "queue", flushQueueIndex)
}
}
Expand Down Expand Up @@ -308,8 +257,8 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
}
if chunkLength < minChunkLength {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
droppedChunks.Add(float64(len(chunks)))
i.metrics.memoryChunks.Sub(float64(len(chunks)))
i.metrics.droppedChunks.Add(float64(len(chunks)))
util.Event().Log(
"msg", "dropped chunks",
"userID", userID,
Expand Down Expand Up @@ -347,7 +296,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
userState.fpLocker.Lock(fp)
if immediate {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
i.metrics.memoryChunks.Sub(float64(len(chunks)))
} else {
for i := 0; i < len(chunks); i++ {
// mark the chunks as flushed, so we can remove them after the retention period
Expand All @@ -366,7 +315,7 @@ func (i *Ingester) removeFlushedChunks(userState *userState, fp model.Fingerprin
if series.chunkDescs[0].flushed && now.Sub(series.chunkDescs[0].LastUpdate) > i.cfg.RetainPeriod {
series.chunkDescs[0] = nil // erase reference so the chunk can be garbage-collected
series.chunkDescs = series.chunkDescs[1:]
memoryChunks.Dec()
i.metrics.memoryChunks.Dec()
} else {
break
}
Expand All @@ -390,18 +339,18 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing
return err
}

sizePerUser := chunkSizePerUser.WithLabelValues(userID)
countPerUser := chunksPerUser.WithLabelValues(userID)
sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID)
countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID)
// Record statistics only when actual put request did not return error.
for _, chunkDesc := range chunkDescs {
utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size()
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "nlabels", len(metric), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
chunkUtilization.Observe(utilization)
chunkLength.Observe(float64(length))
chunkSize.Observe(float64(size))
i.metrics.chunkUtilization.Observe(utilization)
i.metrics.chunkLength.Observe(float64(length))
i.metrics.chunkSize.Observe(float64(size))
sizePerUser.Add(float64(size))
countPerUser.Inc()
chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
})
}

memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
i.metrics.memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
i.metrics.ingestedSamples.Inc()
switch source {
case client.RULE:
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -98,12 +99,12 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),

compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{
compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions.",
}),

compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{
compactionsFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed.",
}),
Expand All @@ -114,12 +115,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
// them from the underlying system (ie. TSDB).
if registerer != nil {
registerer.Unregister(i.metrics.memSeries)
registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
}, i.numSeriesInTSDB))
registerer.MustRegister(i.TSDBState.compactionsTriggered)
registerer.MustRegister(i.TSDBState.compactionsFailed)
}, i.numSeriesInTSDB)
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true)
Expand Down

0 comments on commit fab8e34

Please sign in to comment.