Skip to content

Commit

Permalink
add index header idle delete timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Alibi Shalgymbay <aliby.bbb@gmail.com>
  • Loading branch information
bodevone committed Feb 12, 2024
1 parent f28680c commit dac7586
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#7118](https://github.com/thanos-io/thanos/pull/7118) Store Gateway: Added `--store.index-header-lazy-reader-idle-delete-timeout` to periodically delete idle index header files, when lazy reader is enabled and idle timeout for unloading is set > 0.

### Changed

- [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2.
Expand Down
71 changes: 38 additions & 33 deletions cmd/thanos/store.go
Expand Up @@ -57,39 +57,40 @@ const (
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
lazyIndexReaderIdleDeleteTimeout time.Duration

indexHeaderLazyDownloadStrategy string
}
Expand Down Expand Up @@ -186,6 +187,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.index-header-lazy-reader-idle-delete-timeout", "If index-header lazy reader is enabled, index-header-lazy-reader-idle-timeout is > 0 and this idle timeout is > 0, index header files will be automatically deleted after 'idle timeout' inactivity").
Hidden().Default("24h").DurationVar(&sc.lazyIndexReaderIdleDeleteTimeout)

cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

Expand Down Expand Up @@ -418,6 +422,7 @@ func runStore(
false,
conf.lazyIndexReaderEnabled,
conf.lazyIndexReaderIdleTimeout,
conf.lazyIndexReaderIdleDeleteTimeout,
options...,
)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader.go
Expand Up @@ -26,6 +26,8 @@ import (
var (
errNotIdle = errors.New("the reader is not idle")
errUnloadedWhileLoading = errors.New("the index-header has been concurrently unloaded")
errNotIdleForDelete = errors.New("the reader is not idle for delete")
errLoadedForDelete = errors.New("the reader is loaded for delete")
)

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
Expand All @@ -34,6 +36,8 @@ type LazyBinaryReaderMetrics struct {
loadFailedCount prometheus.Counter
unloadCount prometheus.Counter
unloadFailedCount prometheus.Counter
deleteCount prometheus.Counter
deleteFailedCount prometheus.Counter
loadDuration prometheus.Histogram
}

Expand All @@ -56,6 +60,14 @@ func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetr
Name: "indexheader_lazy_unload_failed_total",
Help: "Total number of failed index-header lazy unload operations.",
}),
deleteCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "indexheader_lazy_delete_total",
Help: "Total number of index-header lazy delete operations.",
}),
deleteFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "indexheader_lazy_delete_failed_total",
Help: "Total number of failed index-header lazy delete operations.",
}),
loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "indexheader_lazy_load_duration_seconds",
Help: "Duration of the index-header lazy loading in seconds.",
Expand Down Expand Up @@ -319,3 +331,61 @@ func (r *LazyBinaryReader) isIdleSince(ts int64) bool {

return loaded
}

// deleteIfIdleSince deletes index header file if the reader is idle since given time (as unix nano). if idleSince is 0,
// the check on the last usage is skipped. Removal of index header file occurs only if reader is unloaded.
// Calling this function on an already deleted index header file is a no-op.
func (r *LazyBinaryReader) deleteIfIdleSince(ts int64) error {
// Nothing to do if reader is loaded.
r.readerMx.RLock()
loaded := r.reader != nil
r.readerMx.RUnlock()

if loaded {
return errLoadedForDelete
}

if ts > 0 && r.usedAt.Load() > ts {
return errNotIdleForDelete
}

indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename)

// Nothing to do if already deleted.
if _, err := os.Stat(indexHeaderFile); os.IsNotExist(err) {
return errors.Wrap(err, "read index header")
}

r.metrics.deleteCount.Inc()
if err := os.Remove(indexHeaderFile); err != nil {
r.metrics.deleteFailedCount.Inc()
return errors.Wrap(err, "remove index header")
}

return nil
}

// isIdleForDeleteSince returns true if the reader is idle since given time (as unix nano), unloaded
// and index header file is present.
func (r *LazyBinaryReader) isIdleForDeleteSince(ts int64) bool {
if r.usedAt.Load() > ts {
return false
}

// A reader can be considered idle for delete only if it's unloaded.
r.readerMx.RLock()
loaded := r.reader != nil
r.readerMx.RUnlock()

if loaded {
return false
}

// A reader can be considered idle for delete only if it's present.
indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename)
if _, err := os.Stat(indexHeaderFile); os.IsNotExist(err) {
return false
}

return true
}
174 changes: 174 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader_test.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore/providers/filesystem"
Expand Down Expand Up @@ -320,3 +321,176 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) {
})
}
}

func TestLazyBinaryReader_delete_ShouldReturnErrorIfNotIdle(t *testing.T) {
ctx := context.Background()

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

// Create block.
blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

for _, lazyDownload := range []bool{false, true} {
t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) {
m := NewLazyBinaryReaderMetrics(nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)

indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename)

// Should lazy load the index upon first usage.
labelNames, err := r.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, []string{"a"}, labelNames)
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount))
// Index header file is present
f, err := os.Stat(indexHeaderFile)
testutil.Ok(t, err)
testutil.Assert(t, f != nil)

// Try to unload (not enough time) and delete (not enough time).
testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano()))
testutil.Equals(t, errLoadedForDelete, r.deleteIfIdleSince(time.Now().Add(-time.Minute).UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount))
// Index header file is present
f, err = os.Stat(indexHeaderFile)
testutil.Ok(t, err)
testutil.Assert(t, f != nil)

// Try to unload (enough time) and delete (not enough time).
testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano()))
testutil.Equals(t, errNotIdleForDelete, r.deleteIfIdleSince(time.Now().Add(-time.Minute).UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount))
// Index header file is present
f, err = os.Stat(indexHeaderFile)
testutil.Ok(t, err)
testutil.Assert(t, f != nil)

// Try to delete (enough time).
testutil.Ok(t, r.deleteIfIdleSince(time.Now().UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.deleteCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount))
// Index header file is present
f, err = os.Stat(indexHeaderFile)
testutil.NotOk(t, err)
testutil.Assert(t, f == nil)
})
}
}

func TestLazyBinaryReader_LoadUnloadDeleteRaceCondition(t *testing.T) {
// Run the test for a fixed amount of time.
const runDuration = 5 * time.Second

ctx := context.Background()

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

// Create block.
blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

for _, lazyDownload := range []bool{false, true} {
t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) {
m := NewLazyBinaryReaderMetrics(nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)
t.Cleanup(func() {
testutil.Ok(t, r.Close())
})

done := make(chan struct{})
time.AfterFunc(runDuration, func() { close(done) })
wg := sync.WaitGroup{}
wg.Add(3)

// Start a goroutine which continuously try to unload the reader.
go func() {
defer wg.Done()

for {
select {
case <-done:
return
default:
testutil.Ok(t, r.unloadIfIdleSince(0))
}
}
}()

// Start a goroutine which continuously try to delete the index header.
go func() {
defer wg.Done()

for {
select {
case <-done:
return
default:
err := r.deleteIfIdleSince(0)
testutil.Assert(t, err == nil || err == errLoadedForDelete || errors.Is(err, os.ErrNotExist))
}
}
}()

// Try to read multiple times, while the other goroutines continuously try to unload and delete it.
go func() {
defer wg.Done()

for {
select {
case <-done:
return
default:
_, err := r.PostingsOffset("a", "1")
testutil.Assert(t, err == nil || err == errUnloadedWhileLoading)

}
}
}()

// Wait until all goroutines have done.
wg.Wait()
})
}
}

0 comments on commit dac7586

Please sign in to comment.