From 5034d71ebbb1dfb53235390e881b483c07a66662 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Mon, 11 Jul 2022 18:11:44 +0200 Subject: [PATCH 1/4] Bump Go version in all the GH Actions (#5487) * Bump go version in go mod This is a follow up to #5258, which made the project be built with Go 1.18. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Update Go version in all GH Actions Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Run go mod tidy Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Added changelog entry Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Put back Go 1.17 in go.mod Because we don't use any Go 1.18 feature yet, so it's not needed Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Update go.sum after changing go.mod to go 1.17 Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove non-user-impacting entry for changelog Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --- .github/workflows/docs.yaml | 4 ++-- .github/workflows/go.yaml | 4 ++-- .github/workflows/mixin.yaml | 9 ++++----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 53fc22baf99..a67173d5b30 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -10,7 +10,7 @@ on: jobs: check: runs-on: ubuntu-latest - name: Documentation check + name: Documentation check env: GOBIN: /tmp/.bin steps: @@ -20,7 +20,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.17.x + go-version: 1.18.x - uses: actions/cache@v1 with: diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 1074000be19..a41ac71eeec 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -42,7 +42,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.17.x + go-version: 1.18.x - uses: actions/cache@v1 with: @@ -59,7 +59,7 @@ jobs: fail-fast: true matrix: parallelism: [4] - index: [0,1,2,3] + index: [0, 1, 2, 3] runs-on: ubuntu-latest name: Thanos end-to-end tests env: diff --git a/.github/workflows/mixin.yaml b/.github/workflows/mixin.yaml index fd2b9ddff6a..fa73f0356a5 100644 --- a/.github/workflows/mixin.yaml +++ b/.github/workflows/mixin.yaml @@ -2,10 +2,9 @@ name: mixin on: push: - branches: [ main ] + branches: [main] pull_request: - branches: [ main ] - + branches: [main] jobs: build: @@ -17,7 +16,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: ^1.17 + go-version: 1.18.x - name: Generate run: make examples @@ -35,7 +34,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.17.x + go-version: 1.18.x - name: Format run: | From 79ab7c65cb4b66b9dcc4fa537cb43b00cc65066c Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 11 Jul 2022 22:55:35 -0700 Subject: [PATCH 2/4] objstore: Download and Upload block files in parallel (#5475) * Parallel Chunks Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * test Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Changelog Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * making ApplyDownloadOptions private Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * upload concurrency Signed-off-by: alanprot Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Upload Test Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * update change log Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Change comments Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Address comments Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Remove duplicate entries on changelog Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio * Addressing Comments Signed-off-by: alanprot Signed-off-by: Alan Protasio * update golang.org/x/sync Signed-off-by: alanprot Signed-off-by: Alan Protasio * Adding Commentts Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 4 + docs/components/compact.md | 4 + go.mod | 2 +- go.sum | 3 +- pkg/block/block.go | 16 ++-- pkg/compact/compact.go | 16 +++- pkg/compact/compact_e2e_test.go | 4 +- pkg/compact/compact_test.go | 6 +- pkg/objstore/objstore.go | 162 +++++++++++++++++++++++++------- pkg/objstore/objstore_test.go | 55 +++++++++++ 11 files changed, 224 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa61f456475..f5966ef4a2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests. - [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. - [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. ### Changed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 18c2be2104c..c23189ad3b2 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -349,6 +349,7 @@ func runCompact( compactMetrics.garbageCollectedBlocks, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), + conf.blockFilesConcurrency, ) tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) planner := compact.WithLargeTotalIndexSizeFilter( @@ -630,6 +631,7 @@ type compactConfig struct { waitInterval time.Duration disableDownsampling bool blockMetaFetchConcurrency int + blockFilesConcurrency int blockViewerSyncBlockInterval time.Duration blockViewerSyncBlockTimeout time.Duration cleanupBlocksInterval time.Duration @@ -688,6 +690,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) + cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage."). + Default("1").IntVar(&cc.blockFilesConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI."). diff --git a/docs/components/compact.md b/docs/components/compact.md index fdc34c7c0d2..e86c698fd38 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,6 +279,10 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: + --block-files-concurrency=1 + Number of goroutines to use when + fetching/uploading block files from object + storage. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. diff --git a/go.mod b/go.mod index cdac7216652..de76a21fd12 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,7 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/text v0.3.7 google.golang.org/api v0.78.0 google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e diff --git a/go.sum b/go.sum index 0ac474c08e4..1f4147d0b84 100644 --- a/go.sum +++ b/go.sum @@ -2262,8 +2262,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/block/block.go b/pkg/block/block.go index fa2b6706267..0b6c2014348 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -45,7 +45,7 @@ const ( // Download downloads directory that is mean to be block directory. If any of the files // have a hash calculated in the meta file and it matches with what is in the destination path then // we do not download it. We always re-download the meta file. -func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { +func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } @@ -74,7 +74,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id } } - if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, ignoredPaths...); err != nil { + if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, append(options, objstore.WithDownloadIgnoredPaths(ignoredPaths...))...); err != nil { return err } @@ -94,21 +94,21 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id // Upload uploads a TSDB block to the object storage. It verifies basic // features of Thanos block. -func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { - return upload(ctx, logger, bkt, bdir, hf, true) +func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { + return upload(ctx, logger, bkt, bdir, hf, true, options...) } // UploadPromBlock uploads a TSDB block to the object storage. It assumes // the block is used in Prometheus so it doesn't check Thanos external labels. -func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { - return upload(ctx, logger, bkt, bdir, hf, false) +func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { + return upload(ctx, logger, bkt, bdir, hf, false, options...) } // upload uploads block from given block dir that ends with block id. // It makes sure cleanup is done on error to avoid partial block uploads. // TODO(bplotka): Ensure bucket operations have reasonable backoff retries. // NOTE: Upload updates `meta.Thanos.File` section. -func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error { +func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) error { df, err := os.Stat(bdir) if err != nil { return err @@ -145,7 +145,7 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "encode meta file") } - if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil { + if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil { return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks")) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6465f16927e..354d1580d89 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -232,6 +232,7 @@ type DefaultGrouper struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc + blockFilesConcurrency int } // NewDefaultGrouper makes a new DefaultGrouper. @@ -245,6 +246,7 @@ func NewDefaultGrouper( garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + blockFilesConcurrency int, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -275,6 +277,7 @@ func NewDefaultGrouper( garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, hashFunc: hashFunc, + blockFilesConcurrency: blockFilesConcurrency, } } @@ -304,6 +307,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, g.hashFunc, + g.blockFilesConcurrency, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -342,6 +346,7 @@ type Group struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc + blockFilesConcurrency int } // NewGroup returns a new compaction group. @@ -362,10 +367,16 @@ func NewGroup( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + blockFilesConcurrency int, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() } + + if blockFilesConcurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), blockFilesConcurrency level must be > 0", blockFilesConcurrency) + } + g := &Group{ logger: logger, bkt: bkt, @@ -383,6 +394,7 @@ func NewGroup( blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, hashFunc: hashFunc, + blockFilesConcurrency: blockFilesConcurrency, } return g, nil } @@ -1007,7 +1019,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) + err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) return err }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { @@ -1109,7 +1121,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { - err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc) + err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) return err }) if err != nil { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 9501fc120cb..40cbcbde256 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -139,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 1) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -214,7 +214,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Ok(t, err) planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 1) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true) testutil.Ok(t, err) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index adf9201b3e4..7af7b2ec6ad 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -210,7 +210,7 @@ func TestRetentionProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) type groupedResult map[string]float64 @@ -376,7 +376,7 @@ func TestCompactProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) for _, tcase := range []struct { testName string @@ -498,7 +498,7 @@ func TestDownsampleProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) for _, tcase := range []struct { testName string diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index c2f3c126438..8bf665d105c 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -12,6 +12,7 @@ import ( "path" "path/filepath" "strings" + "sync" "time" "github.com/go-kit/log" @@ -19,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -119,6 +121,64 @@ func ApplyIterOptions(options ...IterOption) IterParams { return out } +// DownloadOption configures the provided params. +type DownloadOption func(params *downloadParams) + +// downloadParams holds the DownloadDir() parameters and is used by objstore clients implementations. +type downloadParams struct { + concurrency int + ignoredPaths []string +} + +// WithDownloadIgnoredPaths is an option to set the paths to not be downloaded. +func WithDownloadIgnoredPaths(ignoredPaths ...string) DownloadOption { + return func(params *downloadParams) { + params.ignoredPaths = ignoredPaths + } +} + +// WithFetchConcurrency is an option to set the concurrency of the download operation. +func WithFetchConcurrency(concurrency int) DownloadOption { + return func(params *downloadParams) { + params.concurrency = concurrency + } +} + +func applyDownloadOptions(options ...DownloadOption) downloadParams { + out := downloadParams{ + concurrency: 1, + } + for _, opt := range options { + opt(&out) + } + return out +} + +// UploadOption configures the provided params. +type UploadOption func(params *uploadParams) + +// uploadParams holds the UploadDir() parameters and is used by objstore clients implementations. +type uploadParams struct { + concurrency int +} + +// WithUploadConcurrency is an option to set the concurrency of the upload operation. +func WithUploadConcurrency(concurrency int) UploadOption { + return func(params *uploadParams) { + params.concurrency = concurrency + } +} + +func applyUploadOptions(options ...UploadOption) uploadParams { + out := uploadParams{ + concurrency: 1, + } + for _, opt := range options { + opt(&out) + } + return out +} + type ObjectAttributes struct { // Size is the object size in bytes. Size int64 `json:"size"` @@ -172,29 +232,46 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser { // UploadDir uploads all files in srcdir to the bucket with into a top-level directory // named dstdir. It is a caller responsibility to clean partial upload in case of failure. -func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string) error { +func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error { df, err := os.Stat(srcdir) + opts := applyUploadOptions(options...) + + // The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first + // time Wait returns, whichever occurs first. + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(opts.concurrency) + if err != nil { return errors.Wrap(err, "stat dir") } if !df.IsDir() { return errors.Errorf("%s is not a directory", srcdir) } - return filepath.WalkDir(srcdir, func(src string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return nil - } - srcRel, err := filepath.Rel(srcdir, src) - if err != nil { - return errors.Wrap(err, "getting relative path") - } + err = filepath.WalkDir(srcdir, func(src string, d fs.DirEntry, err error) error { + g.Go(func() error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + srcRel, err := filepath.Rel(srcdir, src) + if err != nil { + return errors.Wrap(err, "getting relative path") + } - dst := path.Join(dstdir, filepath.ToSlash(srcRel)) - return UploadFile(ctx, logger, bkt, src, dst) + dst := path.Join(dstdir, filepath.ToSlash(srcRel)) + return UploadFile(ctx, logger, bkt, src, dst) + }) + + return nil }) + + if err == nil { + err = g.Wait() + } + + return err } // UploadFile uploads the file with the given name to the bucket. @@ -254,34 +331,55 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } // DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, ignoredPaths ...string) error { +func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } + opts := applyDownloadOptions(options...) + + // The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first + // time Wait returns, whichever occurs first. + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(opts.concurrency) var downloadedFiles []string - if err := bkt.Iter(ctx, src, func(name string) error { - dst := filepath.Join(dst, filepath.Base(name)) - if strings.HasSuffix(name, DirDelim) { - if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil { + var m sync.Mutex + + err := bkt.Iter(ctx, src, func(name string) error { + g.Go(func() error { + dst := filepath.Join(dst, filepath.Base(name)) + if strings.HasSuffix(name, DirDelim) { + if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil { + return err + } + m.Lock() + downloadedFiles = append(downloadedFiles, dst) + m.Unlock() + return nil + } + for _, ignoredPath := range opts.ignoredPaths { + if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { + level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) + return nil + } + } + if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { return err } + + m.Lock() downloadedFiles = append(downloadedFiles, dst) + m.Unlock() return nil - } - for _, ignoredPath := range ignoredPaths { - if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { - level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) - return nil - } - } - if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { - return err - } - - downloadedFiles = append(downloadedFiles, dst) + }) return nil - }); err != nil { + }) + + if err == nil { + err = g.Wait() + } + + if err != nil { downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory. // Best-effort cleanup if the download failed. for _, f := range downloadedFiles { diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index cd683028cc4..950fd82eb18 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -7,11 +7,14 @@ import ( "bytes" "context" "io" + "io/ioutil" "os" + "strings" "testing" "github.com/go-kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "go.uber.org/atomic" @@ -92,6 +95,58 @@ func TestTracingReader(t *testing.T) { testutil.Equals(t, int64(11), size) } +func TestDownloadUploadDirConcurrency(t *testing.T) { + r := prometheus.NewRegistry() + m := BucketWithMetrics("", NewInMemBucket(), r) + tempDir := t.TempDir() + + testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) + testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) + testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 3 + `), `thanos_objstore_bucket_operations_total`)) + + testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10))) + i, err := ioutil.ReadDir(tempDir) + testutil.Ok(t, err) + testutil.Assert(t, len(i) == 3) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 3 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 3 + `), `thanos_objstore_bucket_operations_total`)) + + testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10))) + + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 3 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 6 + `), `thanos_objstore_bucket_operations_total`)) +} + func TestTimingTracingReader(t *testing.T) { m := BucketWithMetrics("", NewInMemBucket(), nil) r := bytes.NewReader([]byte("hello world")) From 72175302bf32b90963f5c3f67b6f2e5763f8fc8b Mon Sep 17 00:00:00 2001 From: Matej Gera <38492574+matej-g@users.noreply.github.com> Date: Tue, 12 Jul 2022 08:01:39 +0200 Subject: [PATCH 3/4] Use default HTTP config for E2E S3 tests (#5483) Signed-off-by: Matej Gera --- test/e2e/e2ethanos/services.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 88868dc3b77..41fccca04da 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -967,19 +967,20 @@ func genCerts(certPath, privkeyPath, caPath, serverName string) error { } func NewS3Config(bucket, endpoint, basePath string) s3.Config { + httpDefaultConf := s3.DefaultConfig.HTTPConfig + httpDefaultConf.TLSConfig = exthttp.TLSConfig{ + CAFile: filepath.Join(basePath, "certs", "CAs", "ca.crt"), + CertFile: filepath.Join(basePath, "certs", "public.crt"), + KeyFile: filepath.Join(basePath, "certs", "private.key"), + } + return s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: endpoint, - Insecure: false, - HTTPConfig: exthttp.HTTPConfig{ - TLSConfig: exthttp.TLSConfig{ - CAFile: filepath.Join(basePath, "certs", "CAs", "ca.crt"), - CertFile: filepath.Join(basePath, "certs", "public.crt"), - KeyFile: filepath.Join(basePath, "certs", "private.key"), - }, - }, + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: endpoint, + Insecure: false, + HTTPConfig: httpDefaultConf, BucketLookupType: s3.AutoLookup, } } From ca4fe82d74f6b0c7323ad900a25a589c9abfedc0 Mon Sep 17 00:00:00 2001 From: Naveen <172697+naveensrinivasan@users.noreply.github.com> Date: Tue, 12 Jul 2022 01:02:27 -0500 Subject: [PATCH 4/4] chore: Included githubactions in the dependabot config (#5364) This should help with keeping the GitHub actions updated on new releases. This will also help with keeping it secure. Dependabot helps in keeping the supply chain secure https://docs.github.com/en/code-security/dependabot GitHub actions up to date https://docs.github.com/en/code-security/dependabot/working-with-dependabot/keeping-your-actions-up-to-date-with-dependabot https://github.com/ossf/scorecard/blob/main/docs/checks.md#dependency-update-tool Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com> --- .github/dependabot.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 81e527f988f..d751f8c96c6 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,3 +12,10 @@ updates: interval: "weekly" labels: ["dependencies"] + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: weekly + labels: + - "dependencies" +