Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch pkg/ingester to promauto.With(reg) #2299

Merged
merged 4 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion integration/asserts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
// Service-specific metrics prefixes which shouldn't be used by any other service.
serviceMetricsPrefixes = map[ServiceType][]string{
Distributor: []string{},
Ingester: []string{},
Ingester: []string{"!cortex_ingester_client", "cortex_ingester"}, // The metrics prefix cortex_ingester_client may be used by other components so we ignore it.
Querier: []string{},
QueryFrontend: []string{"cortex_frontend", "cortex_query_frontend"},
TableManager: []string{},
Expand Down Expand Up @@ -57,6 +57,11 @@ func assertServiceMetricsPrefixes(t *testing.T, serviceType ServiceType, service
}

for _, prefix := range blacklist {
// Skip the metric if it matches an ignored prefix.
if prefix[0] == '!' && strings.HasPrefix(metricLine, prefix[1:]) {
break
}

assert.NotRegexp(t, "^"+prefix, metricLine, "service: %s", service.Name())
}
}
Expand Down
3 changes: 2 additions & 1 deletion integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
bucketName = "cortex"
cortexConfigFile = "config.yaml"
cortexSchemaConfigFile = "schema.yaml"
blocksStorageEngine = "tsdb"
storeConfigTemplate = `
- from: {{.From}}
store: {{.IndexStore}}
Expand Down Expand Up @@ -61,7 +62,7 @@ var (
}

BlocksStorageFlags = map[string]string{
"-store.engine": "tsdb",
"-store.engine": blocksStorageEngine,
"-experimental.tsdb.backend": "s3",
"-experimental.tsdb.block-ranges-period": "1m",
"-experimental.tsdb.bucket-store.sync-interval": "5s",
Expand Down
5 changes: 4 additions & 1 deletion integration/ingester_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
ingester := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
"-ingester.max-transfer-retries": "0",
}), "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
Expand Down Expand Up @@ -62,6 +62,9 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
require.Equal(t, 200, res.StatusCode)
}

// Ensure ingester metrics are tracked correctly.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_chunks_created_total"))

// Query the series.
result, err := c.Query("series_1", now)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions integration/ingester_hand_over_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

// Ensure 1st ingester metrics are tracked correctly.
if flags["-store.engine"] != blocksStorageEngine {
require.NoError(t, ingester1.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_chunks_created_total"))
}

// Start ingester-2.
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ingester.join-after": "10s",
Expand Down
75 changes: 12 additions & 63 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/go-kit/kit/log/level"
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -23,55 +21,6 @@ const (
flushBackoff = 1 * time.Second
)

var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
chunkLength = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_length",
Help: "Distribution of stored chunk lengths (when stored).",
Buckets: prometheus.ExponentialBuckets(5, 2, 11), // biggest bucket is 5*2^(11-1) = 5120
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000
})
chunksPerUser = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunks_stored_total",
Help: "Total stored chunks per user.",
}, []string{"user"})
chunkSizePerUser = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per user.",
}, []string{"user"})
chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
flushReasons = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_flush_reasons",
Help: "Total number of series scheduled for flushing, with reasons.",
}, []string{"reason"})
droppedChunks = promauto.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_dropped_chunks_total",
Help: "Total number of chunks dropped from flushing because they have too few samples.",
})
oldestUnflushedChunkTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_oldest_unflushed_chunk_timestamp_seconds",
Help: "Unix timestamp of the oldest unflushed chunk in the memory",
})
)

// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
Expand Down Expand Up @@ -133,7 +82,7 @@ func (i *Ingester) sweepUsers(immediate bool) {
}
}

oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
}

type flushReason int8
Expand Down Expand Up @@ -186,7 +135,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo

flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) {
flushReasons.WithLabelValues(flush.String()).Inc()
i.metrics.flushReasons.WithLabelValues(flush.String()).Inc()
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "nlabels", len(series.metric), "queue", flushQueueIndex)
}
}
Expand Down Expand Up @@ -308,8 +257,8 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
}
if chunkLength < minChunkLength {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
droppedChunks.Add(float64(len(chunks)))
i.metrics.memoryChunks.Sub(float64(len(chunks)))
i.metrics.droppedChunks.Add(float64(len(chunks)))
util.Event().Log(
"msg", "dropped chunks",
"userID", userID,
Expand Down Expand Up @@ -347,7 +296,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
userState.fpLocker.Lock(fp)
if immediate {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
i.metrics.memoryChunks.Sub(float64(len(chunks)))
} else {
for i := 0; i < len(chunks); i++ {
// mark the chunks as flushed, so we can remove them after the retention period
Expand All @@ -366,7 +315,7 @@ func (i *Ingester) removeFlushedChunks(userState *userState, fp model.Fingerprin
if series.chunkDescs[0].flushed && now.Sub(series.chunkDescs[0].LastUpdate) > i.cfg.RetainPeriod {
series.chunkDescs[0] = nil // erase reference so the chunk can be garbage-collected
series.chunkDescs = series.chunkDescs[1:]
memoryChunks.Dec()
i.metrics.memoryChunks.Dec()
} else {
break
}
Expand All @@ -390,18 +339,18 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing
return err
}

sizePerUser := chunkSizePerUser.WithLabelValues(userID)
countPerUser := chunksPerUser.WithLabelValues(userID)
sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID)
countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID)
// Record statistics only when actual put request did not return error.
for _, chunkDesc := range chunkDescs {
utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size()
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "nlabels", len(metric), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
chunkUtilization.Observe(utilization)
chunkLength.Observe(float64(length))
chunkSize.Observe(float64(size))
i.metrics.chunkUtilization.Observe(utilization)
i.metrics.chunkLength.Observe(float64(length))
i.metrics.chunkSize.Observe(float64(size))
sizePerUser.Add(float64(size))
countPerUser.Inc()
chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
})
}

memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
i.metrics.memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
i.metrics.ingestedSamples.Inc()
switch source {
case client.RULE:
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -98,12 +99,12 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),

compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{
compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions.",
}),

compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{
compactionsFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed.",
}),
Expand All @@ -114,12 +115,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
// them from the underlying system (ie. TSDB).
if registerer != nil {
registerer.Unregister(i.metrics.memSeries)
registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
}, i.numSeriesInTSDB))
registerer.MustRegister(i.TSDBState.compactionsTriggered)
registerer.MustRegister(i.TSDBState.compactionsFailed)
}, i.numSeriesInTSDB)
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true)
Expand Down