Skip to content

Commit

Permalink
treat interrupted compactions different than failures (#4921)
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Lyons <lyonssp777@gmail.com>

Signed-off-by: Sean Lyons <lyonssp777@gmail.com>
  • Loading branch information
lyonssp committed Oct 24, 2022
1 parent 6a48d3c commit 6249935
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [CHANGE] Disables TSDB isolation. #4825
* [CHANGE] Drops support Prometheus 1.x rule format on configdb. #4826
* [CHANGE] Removes `-ingester.stream-chunks-when-using-blocks` experimental flag and stream chunks by default when `querier.ingester-streaming` is enabled. #4864
* [CHANGE] Compactor: Added `cortex_compactor_runs_interrupted_total` to separate compaction interruptions from failures
* [ENHANCEMENT] AlertManager: Retrying AlertManager Get Requests (Get Alertmanager status, Get Alertmanager Receivers) on next replica on error #4840
* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 #4839
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
Expand Down
35 changes: 27 additions & 8 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ type Compactor struct {

// Metrics.
compactionRunsStarted prometheus.Counter
compactionRunsInterrupted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsFailed prometheus.Counter
compactionRunsLastSuccess prometheus.Gauge
Expand Down Expand Up @@ -399,6 +400,10 @@ func newCompactor(
Name: "cortex_compactor_runs_started_total",
Help: "Total number of compaction runs started.",
}),
compactionRunsInterrupted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_interrupted_total",
Help: "Total number of compaction runs interrupted.",
}),
compactionRunsCompleted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_completed_total",
Help: "Total number of compaction runs successfully completed.",
Expand Down Expand Up @@ -598,16 +603,23 @@ func (c *Compactor) running(ctx context.Context) error {
}

func (c *Compactor) compactUsers(ctx context.Context) {
succeeded := false
compactionErrorCount := 0
failed := false
interrupted := false

c.compactionRunsStarted.Inc()

defer func() {
if succeeded && compactionErrorCount == 0 {
// interruptions and successful runs are considered
// mutually exclusive but we consider a run failed if any
// tenant runs failed even if later runs are interrupted
if !interrupted && !failed {
c.compactionRunsCompleted.Inc()
c.compactionRunsLastSuccess.SetToCurrentTime()
} else {
}
if interrupted {
c.compactionRunsInterrupted.Inc()
}
if failed {
c.compactionRunsFailed.Inc()
}

Expand All @@ -621,6 +633,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
level.Info(c.logger).Log("msg", "discovering users from bucket")
users, err := c.discoverUsersWithRetries(ctx)
if err != nil {
failed = true
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
return
}
Expand All @@ -640,7 +653,8 @@ func (c *Compactor) compactUsers(ctx context.Context) {
for _, userID := range users {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if ctx.Err() != nil {
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
interrupted = true
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID)
return
}

Expand Down Expand Up @@ -670,8 +684,15 @@ func (c *Compactor) compactUsers(ctx context.Context) {
level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)

if err = c.compactUserWithRetries(ctx, userID); err != nil {
// TODO: patch thanos error types to support errors.Is(err, context.Canceled) here
if ctx.Err() != nil && ctx.Err() == context.Canceled {
interrupted = true
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID)
return
}

c.compactionRunFailedTenants.Inc()
compactionErrorCount++
failed = true
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
continue
}
Expand Down Expand Up @@ -706,8 +727,6 @@ func (c *Compactor) compactUsers(ctx context.Context) {
}
}
}

succeeded = true
}

func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error {
Expand Down
61 changes: 61 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1796,3 +1796,64 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}

func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
bucketClient := objstore.NewInMemBucket()
id := ulid.MustNew(ulid.Now(), rand.Reader)
require.NoError(t, bucketClient.Upload(context.Background(), "user-1/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))

b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"})
b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"})

c, tsdbCompactor, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)

ctx, cancel := context.WithCancel(context.Background())
tsdbCompactor.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) {
cancel()
})
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: b1,
MinTime: 10,
MaxTime: 20,
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: b2,
MinTime: 20,
MaxTime: 30,
},
},
}, nil)
require.NoError(t, services.StartAndAwaitRunning(ctx, c))

cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsInterrupted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_completed_total counter
# HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed.
cortex_compactor_runs_completed_total 0
# TYPE cortex_compactor_runs_interrupted_total counter
# HELP cortex_compactor_runs_interrupted_total Total number of compaction runs interrupted.
cortex_compactor_runs_interrupted_total 1
# TYPE cortex_compactor_runs_failed_total counter
# HELP cortex_compactor_runs_failed_total Total number of compaction runs failed.
cortex_compactor_runs_failed_total 0
`),
"cortex_compactor_runs_completed_total",
"cortex_compactor_runs_interrupted_total",
"cortex_compactor_runs_failed_total",
))

lines := strings.Split(logs.String(), "\n")
require.Contains(t, lines, `level=info component=compactor msg="interrupting compaction of user blocks" user=user-1`)
require.NotContains(t, logs.String(), `level=error`)
}

0 comments on commit 6249935

Please sign in to comment.