-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compactor: multi-store support #7447
Conversation
Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
- loki -0.5% |
1 similar comment
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
- loki -0.5% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't comment on the compaction behaviour because I'm not too familiar with it, but overall the changes LGTM.
I've left a few nits for clarity and idiomatic Go practices
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") | ||
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") | ||
f.DurationVar(&cfg.ApplyRetentionInterval, "boltdb.shipper.compactor.apply-retention-interval", 0, "Interval at which to apply/enforce retention. 0 means run at same interval as compaction. If non-zero, it should always be a multiple of compaction interval.") | ||
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") | ||
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") | ||
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") | ||
f.StringVar(&cfg.DeleteRequestStore, "boltdb.shipper.compactor.delete-request-store", "", "Store used for managing delete requests. If not set, shared_store is used as a fallback.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to even provide this as an option? Under what circumstances do you see a separate store being needed?
I do like that you're falling back to the shared store, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It need not be a separate store, users can choose from one of the object stores used for index.
When it comes to index, we know where to route reads/writes because of the schema_config, but I believe the same cannot be done when storing delete requests as they don't have any period associated with them.
We could implicitly assume that delete requests should always go to the object store referred to in the latest schema_config entry, but this would result in compactor not processing any pending requests from older stores. I thought making this configurable would allow users to change it as they see fit.
I could also document this better
pkg/storage/stores/indexshipper/compactor/retention/retention.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Doc squad] I had a couple of small suggestions for wording.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Docs squad} Documentation LGTM. Couple of small suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall changes look good to me with the initial pass. I need to think more on how we approach the deletion markers migration.
docs/sources/upgrading/_index.md
Outdated
|
||
If `-boltdb.shipper.compactor.shared-store` is not set, it defaults to the `object_store` configured in the latest `period_config` that uses either the tsdb or boltdb-shipper index. | ||
|
||
In releases 2.7.5 and later, the Compactor supports index compaction on multiple stores. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In releases 2.7.5 and later, the Compactor supports index compaction on multiple stores. | |
In releases 2.7.5 and later, the Compactor supports index compaction on multiple buckets/object stores. |
pkg/loki/config_wrapper.go
Outdated
if i := lastBoltdbShipperConfig(r.SchemaConfig.Configs); i != len(r.SchemaConfig.Configs) { | ||
betterBoltdbShipperDefaults(r, &defaults, r.SchemaConfig.Configs[i]) | ||
} | ||
|
||
if i := lastTSDBConfig(r.SchemaConfig.Configs); i != len(r.SchemaConfig.Configs) { | ||
betterTSDBShipperDefaults(r, &defaults, r.SchemaConfig.Configs[i]) | ||
} | ||
|
||
if i := lastBoltdbShipperConfig(r.SchemaConfig.Configs); i != len(r.SchemaConfig.Configs) { | ||
betterBoltdbShipperDefaults(r, &defaults, r.SchemaConfig.Configs[i]) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the last boltdb-shipper
or tsdb
config. Here we would just overwrite the value set by the previous one, which could be different. We can maybe define a separate function for compactor betterCompactorDefaults
} | ||
|
||
if err := c.initDeletes(r, limits); err != nil { | ||
return err | ||
if err := retention.MigrateMarkers(filepath.Join(c.cfg.WorkingDirectory, "retention"), deleteRequestStore); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletion markers might not always belong to objects in the delete requests store. There is a high chance when someone wants to start using a new bucket, they would change the bucket for delete requests store as well. In that case we will try deleting objects from new bucket here while the objects were stored in older bucket. Will think more and see what we can do here.
encoder = client.FSEncoder | ||
deleteRequestStore := c.cfg.DeleteRequestStore | ||
// if -boltdb.shipper.compactor.shared-store is set, use it instead to ensure backward compatibility. | ||
if c.cfg.SharedStoreType != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should give precedence to c.cfg.DeleteRequestStore
since it is a new and explicit config. Any change in that config would have been made by user deliberately. Also, it looks inconsistent not honoring an explicit config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look great. Just have some non-blocking changes for code readablitiy so just approving the PR. Will merge it once the feedback has been addressed.
@@ -87,12 +90,11 @@ type Marker struct { | |||
markTimeout time.Duration | |||
} | |||
|
|||
func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) { | |||
metrics := newMarkerMetrics(r) | |||
func NewMarker(workingDirectory, objectStoreType string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused param objectStoreType
@@ -272,8 +274,9 @@ type Sweeper struct { | |||
sweeperMetrics *sweeperMetrics | |||
} | |||
|
|||
func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) { | |||
func NewSweeper(workingDir, objectStoreType string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used param objectStoreType
@@ -79,6 +80,8 @@ type Config struct { | |||
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` | |||
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` | |||
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"` | |||
DeleteRequestStore string `yaml:"delete_request_store"` | |||
LegacySharedStoreDefault string `yaml:"-" doc:"hidden"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can be more specific here and call it like DefaultDeleteRequestStore
or LegacyDefaultDeleteRequestStore
.
pkg/storage/config/schema_config.go
Outdated
return usingForPeriodConfigs(configs, fn) | ||
// ContainsObjectStorageIndex returns true if the current or any of the upcoming periods | ||
// use an object store index. | ||
func ContainsObjectStorageIndex(configs []PeriodConfig) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I called it UsingObjectStoreIndex
because it checked whether current or upcoming index is an object storage based index. This new name makes it confusing. Maybe swap UsingObjectStoreIndex
and ContainsObjectStorageIndex
names?
|
||
// since compactor supports multiple stores, markers need to be written to store specific dir. | ||
// MigrateMarkers checks for markers in retention dir and migrates them. | ||
func MigrateMarkers(workingDir string, store string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us be more specific and call it CopyMarkers
? Also maybe make the comment more descriptive saying we are copying markers to store specific marker directory?
Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
**What this PR does / why we need it**: noticed that a couple of unreleased changes are incorrectly showing under 2.8.0 release, moving them to the right place. also adds the missing changelog entry for #7447 **Checklist** - [X] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](d10549e) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
What this PR does / why we need it:
This PR adds multi-store support for compactors. Since loki allows users to configure mutiple stores using schema_config, compactor should be able to operate on multiple object stores that contain index. Currently, it can perform compaction on indexes in a single store.
To maintain backward compatibility: if
boltdb.shipper.compactor.shared-store
is set, compactor will only operate on that store, else compactor will be initialized to operate on all the object store indexes (boltdb, tsdb
) defined in the schema config.This PR also adds a new config option to define where delete requests are to be stored -
boltdb.shipper.compactor.delete-request-store
. If it's not set,boltdb.shipper.compactor.shared-store
is used for storing them, this is to ensure no config changes are required by the users when upgrading. Refer to docs/sources/upgrading/_index.md for more details.Which issue(s) this PR fixes:
Fixes #7276
Checklist
CONTRIBUTING.md
guideCHANGELOG.md
updateddocs/sources/upgrading/_index.md