Skip to content

Commit

Permalink
Align tenant pruning according to wall clock.
Browse files Browse the repository at this point in the history
Pruning a tenant currently acquires a lock on the tenant's TSDB,
which blocks reads from incoming queries. We have noticed spikes in
query latency when tenants get decomissioned since each receiver will
prune the tenant at a different time.

To reduce the window where queries get degraded, this commit makes sure that
pruning happens at predictable intervals by aligning it to the wall clock, similar
to how head compaction is aligned.

The commit also changes the tenant deletion condition to look at the duration
from the min time of the tenant, rather than from the last append time.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Apr 23, 2024
1 parent a96e7f3 commit c499e2f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
8 changes: 7 additions & 1 deletion cmd/thanos/receive.go
Expand Up @@ -418,7 +418,13 @@ func runReceive(
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(2*time.Hour, ctx.Done(), func() error {
pruneInterval := 2 * time.Duration(tsdbOpts.MaxBlockDuration) * time.Millisecond
return runutil.Repeat(1*time.Minute, ctx.Done(), func() error {
currentTime := time.Now()
currentTotalMinutes := currentTime.Hour()*60 + currentTime.Minute()
if currentTotalMinutes%int(pruneInterval.Minutes()) != 0 {
return nil
}
if err := dbs.Prune(ctx); err != nil {
level.Error(logger).Log("err", err)
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/receive/multitsdb.go
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"context"
"fmt"
"math"
"os"
"path"
"path/filepath"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -334,6 +336,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
if t.tsdbOpts.RetentionDuration == 0 {
return nil
}
level.Info(t.logger).Log("msg", "Running pruning job")

var (
wg sync.WaitGroup
Expand All @@ -342,7 +345,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
prunedTenants []string
pmtx sync.Mutex
)

t.mtx.RLock()
for tenantID, tenantInstance := range t.tenants {
wg.Add(1)
Expand Down Expand Up @@ -438,7 +440,11 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, err
}

if sinceLastAppendMillis <= t.tsdbOpts.RetentionDuration {
var tenantMinTimeMillis int64 = math.MaxInt64
for _, b := range tdb.Blocks() {
tenantMinTimeMillis = min(b.MinTime(), tenantMinTimeMillis)
}
if time.Since(time.UnixMilli(tenantMinTimeMillis)).Milliseconds() <= t.tsdbOpts.RetentionDuration {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb_test.go
Expand Up @@ -455,7 +455,7 @@ func TestMultiTSDBPrune(t *testing.T) {
defer func() { testutil.Ok(t, m.Close()) }()

for i := 0; i < 100; i++ {
testutil.Ok(t, appendSample(m, "deleted-tenant", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "deleted-tenant", time.Now().Add(-10*time.Hour)))
testutil.Ok(t, appendSample(m, "compacted-tenant", time.Now().Add(-4*time.Hour)))
testutil.Ok(t, appendSample(m, "active-tenant", time.Now().Add(time.Duration(i)*time.Second)))
}
Expand Down

0 comments on commit c499e2f

Please sign in to comment.