Skip to content

Commit

Permalink
Backfilling the new limits when updating the metrics (#5955)
Browse files Browse the repository at this point in the history
* Backfilling the new limits when updating the metrics

Signed-off-by: alanprot <alanprot@gmail.com>

* make clean-white-noise

Signed-off-by: alanprot <alanprot@gmail.com>

---------

Signed-off-by: alanprot <alanprot@gmail.com>
  • Loading branch information
alanprot committed May 16, 2024
1 parent be4dd02 commit 6be4779
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
* [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931
* [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933
* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Query Frontend/Ruler: Omit empty data field in API response. #5953 #5954
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
Expand Down
37 changes: 22 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
i.stoppedMtx.RUnlock()

case <-activeSeriesTickerChan:
i.updateActiveSeries()
i.updateActiveSeries(ctx)
case <-maxInflightRequestResetTicker.C:
i.maxInflightQueryRequests.Tick()
case <-userTSDBConfigTicker.C:
Expand Down Expand Up @@ -929,7 +929,7 @@ func (i *Ingester) getMaxExemplars(userID string) int64 {
return int64(maxExemplarsFromLimits)
}

func (i *Ingester) updateActiveSeries() {
func (i *Ingester) updateActiveSeries(ctx context.Context) {
purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout)

for _, userID := range i.getTSDBUsers() {
Expand All @@ -940,7 +940,9 @@ func (i *Ingester) updateActiveSeries() {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet)
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics.activeSeriesPerLabelSet); err != nil {
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
}
}
}

Expand Down Expand Up @@ -1054,18 +1056,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
nativeHistogramCount = 0
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
nativeHistogramCount = 0

updateFirstPartial = func(errFn func() error) {
if firstPartialErr == nil {
Expand Down Expand Up @@ -1150,6 +1153,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
Expand Down Expand Up @@ -1245,6 +1249,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if perMetricSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}
if perLabelSetSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
}

if nativeHistogramCount > 0 {
validation.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
Expand Down
52 changes: 37 additions & 15 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry)
registry.MustRegister(validation.DiscardedSamples)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
Expand All @@ -132,13 +133,13 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
}
}

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should impose limits
for _, set := range limits.MaxSeriesPerLabelSet {
Expand All @@ -154,13 +155,16 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.ErrorContains(t, err, set.Id)
}

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should apply composite limits
limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet,
Expand All @@ -187,6 +191,21 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

// Should backfill
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Adding 5 metrics with only 1 label
for i := 0; i < 5; i++ {
lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1"}
Expand All @@ -211,16 +230,19 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, labels.FromStrings("comp1", "compValue1", "comp2", "compValue2").String())

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 3
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 2
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 7
cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should bootstrap and apply limits when configuration change
limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet,
Expand Down Expand Up @@ -249,7 +271,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, labels.FromStrings(lbls...).String())

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand All @@ -267,7 +289,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand All @@ -281,7 +303,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
ing, err = prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand Down Expand Up @@ -1207,7 +1229,7 @@ func TestIngester_Push(t *testing.T) {

// Update active series for metrics check.
if !testData.disableActiveSeries {
i.updateActiveSeries()
i.updateActiveSeries(ctx)
}

// Append additional metrics to assert on.
Expand Down Expand Up @@ -1274,7 +1296,7 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi
}

// Update active series for metrics check.
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// Check tracked Prometheus metrics
expectedMetrics := `
Expand Down Expand Up @@ -1361,7 +1383,7 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) {
time.Sleep(200 * time.Millisecond)

// Update active series for metrics check. This will remove inactive series.
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// Check tracked Prometheus metrics
expectedMetrics := `
Expand Down Expand Up @@ -3733,7 +3755,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {
})

pushSingleSampleWithMetadata(t, i)
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

require.Equal(t, int64(1), i.TSDBState.seriesCount.Load())

Expand Down Expand Up @@ -3774,7 +3796,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {
})

require.Greater(t, testutil.ToFloat64(i.TSDBState.idleTsdbChecks.WithLabelValues(string(tsdbIdleClosed))), float64(0))
i.updateActiveSeries()
i.updateActiveSeries(context.Background())
require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) // Flushing removed all series from memory.

// Verify that user has disappeared from metrics.
Expand All @@ -3799,7 +3821,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {

// Pushing another sample will recreate TSDB.
pushSingleSampleWithMetadata(t, i)
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// User is back.
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int
return errMaxMetadataPerUserLimitExceeded
}

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error {
m := l.maxSeriesPerLabelSet(userID, metric)
for _, limit := range m {
Expand Down
88 changes: 52 additions & 36 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,45 +125,49 @@ func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTS
s.RUnlock()

// We still dont keep track of this label value so we need to backfill
ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}
return m.backFillLimit(ctx, u, set, s)
})
}

defer ir.Close()
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.MaxSeriesPerLabelSet, s *labelSetCounterShard) (int, error) {
ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[set.Hash]; !ok {
postings := make([]index.Postings, 0, len(set.LabelSet))
for _, lbl := range set.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
defer ir.Close()

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[limit.Hash]; !ok {
postings := make([]index.Postings, 0, len(limit.LabelSet))
for _, lbl := range limit.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
}

p := index.Intersect(postings...)
p := index.Intersect(postings...)

totalCount := 0
for p.Next() {
totalCount++
}
totalCount := 0
for p.Next() {
totalCount++
}

if p.Err() != nil {
return 0, p.Err()
}
if p.Err() != nil {
return 0, p.Err()
}

s.valuesCounter[set.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: set.LabelSet,
}
return totalCount, nil
} else {
return r.count, nil
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
}
})
return totalCount, nil
} else {
return r.count, nil
}
}

func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
Expand Down Expand Up @@ -195,24 +199,36 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe
}
}

func (m *labelSetCounter) UpdateMetric(u *userTSDB, vec *prometheus.GaugeVec) {
currentLbsLimitHash := map[uint64]struct{}{}
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *prometheus.GaugeVec) error {
currentLbsLimitHash := map[uint64]validation.MaxSeriesPerLabelSet{}
for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) {
currentLbsLimitHash[l.Hash] = struct{}{}
currentLbsLimitHash[l.Hash] = l
}

for i := 0; i < numMetricCounterShards; i++ {
s := m.shards[i]
s.RLock()
for h, entry := range s.valuesCounter {
// This limit no longer ecists
// This limit no longer exists
if _, ok := currentLbsLimitHash[h]; !ok {
vec.DeleteLabelValues(u.userID, entry.labels.String())
continue
}

delete(currentLbsLimitHash, h)
vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count))
}
s.RUnlock()
}

// Backfill all limits that are not being tracked yet
for _, l := range currentLbsLimitHash {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
count, err := m.backFillLimit(ctx, u, l, s)
if err != nil {
return err
}
vec.WithLabelValues(u.userID, l.LabelSet.String()).Set(float64(count))
}

return nil
}

0 comments on commit 6be4779

Please sign in to comment.