Skip to content

Commit

Permalink
compactor: multi-store support (#7447)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This PR adds multi-store support for compactors. Since loki allows users
to configure mutiple stores using schema_config, compactor should be
able to operate on multiple object stores that contain index. Currently,
it can perform compaction on indexes in a single store.

To maintain backward compatibility: if
`boltdb.shipper.compactor.shared-store` is set, compactor will only
operate on that store, else compactor will be initialized to operate on
all the object store indexes (`boltdb, tsdb`) defined in the schema
config.

This PR also adds a new config option to define where delete requests
are to be stored - `boltdb.shipper.compactor.delete-request-store`. If
it's not set, `boltdb.shipper.compactor.shared-store` is used for
storing them, this is to ensure no config changes are required by the
users when upgrading. Refer to
[docs/sources/upgrading/_index.md](https://github.com/grafana/loki/blob/a85d01c9cd3c62a62b29a20274b71d3c51f1d11a/docs/sources/upgrading/_index.md)
for more details.

**Which issue(s) this PR fixes:**
Fixes #7276

**Checklist**
- [X] Reviewed the `CONTRIBUTING.md` guide
- [X] Documentation added
- [X] Tests updated
- [X] `CHANGELOG.md` updated
- [X] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

---------

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
  • Loading branch information
ashwanthgoli committed Apr 25, 2023
1 parent 6f1d1d7 commit 52cd0a3
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 191 deletions.
9 changes: 8 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,9 @@ The `compactor` block configures the compactor component, which compacts index s
[working_directory: <string> | default = ""]

# The shared store used for storing boltdb files. Supported types: gcs, s3,
# azure, swift, filesystem, bos, cos.
# azure, swift, filesystem, bos, cos. If not set, compactor will be initialized
# to operate on all the object stores that contain either boltdb-shipper or tsdb
# index.
# CLI flag: -boltdb.shipper.compactor.shared-store
[shared_store: <string> | default = ""]

Expand Down Expand Up @@ -2051,6 +2053,11 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -boltdb.shipper.compactor.retention-table-timeout
[retention_table_timeout: <duration> | default = 0s]

# Store used for managing delete requests. Defaults to
# -boltdb.shipper.compactor.shared-store.
# CLI flag: -boltdb.shipper.compactor.delete-request-store
[delete_request_store: <string> | default = ""]

# The max number of delete requests to run per compaction cycle.
# CLI flag: -boltdb.shipper.compactor.delete-batch-size
[delete_batch_size: <int> | default = 70]
Expand Down
29 changes: 25 additions & 4 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,38 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

#### Index shipper multi-store support
In previous releases, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
These defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

#### Shutdown marker file

A shutdown marker file can be written by the `/ingester/prepare_shutdown` endpoint.
If the new `ingester.shutdown_marker_path` config setting has a value that value is used.
If not the`common.path_prefix` config setting is used if it has a value. Otherwise a warning is shown
in the logs on startup and the `/ingester/prepare_shutdown` endpoint will return a 500 status code.

#### Compactor multi-store support

In previous releases, setting `-boltdb.shipper.compactor.shared-store` configured the following:
- store used for managing delete requests.
- store on which index compaction should be performed.

If `-boltdb.shipper.compactor.shared-store` was not set, it used to default to the `object_store` configured in the latest `period_config` that uses either the tsdb or boltdb-shipper index.

Compactor now supports index compaction on multiple buckets/object stores.
And going forward loki will not set any defaults on `-boltdb.shipper.compactor.shared-store`, this has a couple of side effects detailed as follows:

##### store on which index compaction should be performed:
If `-boltdb.shipper.compactor.shared-store` is configured by the user, loki would run index compaction only on the store specified by the config.
If not set, compaction would be performed on all the object stores that contain either a boltdb-shipper or tsdb index.

##### store used for managing delete requests:
A new config option `-boltdb.shipper.compactor.delete-request-store` decides where delete requests should be stored. This new option takes precedence over `-boltdb.shipper.compactor.shared-store`.

In the case where neither of these options are set, the `object_store` configured in the latest `period_config` that uses either a tsdb or boltdb-shipper index is used for storing delete requests to ensure pending requests are processed.


## 2.8.0

### Loki
Expand Down Expand Up @@ -150,10 +175,6 @@ level=info ts=2022-12-20T15:27:54.858554127Z caller=metrics.go:147 component=fro

These statistics are also displayed when using `--stats` with LogCLI.

#### Index shipper multi-store support
In releases prior to 2.8.1, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
In releases 2.8.1 and later, these defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

## 2.7.0

### Loki
Expand Down
1 change: 0 additions & 1 deletion integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ storage_config:
compactor:
working_directory: {{.dataPath}}/retention
shared_store: filesystem
retention_enabled: true
analytics:
Expand Down
36 changes: 23 additions & 13 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.WithAdditionalBoltDBPeriod)
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientsWithShipper()
Expand Down Expand Up @@ -97,11 +97,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
},
Values: [][]string{
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
strconv.FormatInt(now.Add(-46*time.Hour).UnixNano(), 10),
"lineA",
},
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
strconv.FormatInt(now.Add(-46*time.Hour).UnixNano(), 10),
"lineB",
},
{
Expand All @@ -118,32 +118,32 @@ func TestMicroServicesDeleteRequest(t *testing.T) {

expectedDeleteRequests := []client.DeleteRequest{
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter"} |= "lineB"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter_no_match"} |= "foo"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Add(-10 * time.Minute).Unix(),
Query: `{deletion_type="partially_by_time"}`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="whole"}`,
Status: "received",
},
}

validateQueryResponse := func(resp *client.Response) {
validateQueryResponse := func(expectedStreams []client.StreamValues, resp *client.Response) {
t.Helper()
assert.Equal(t, "streams", resp.Data.ResultType)

Expand Down Expand Up @@ -173,7 +173,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)

// given default value of query_ingesters_within is 3h, older samples won't be present in the response
var es []client.StreamValues
for _, stream := range expectedStreams {
stream.Values = stream.Values[2:]
es = append(es, stream)
}
validateQueryResponse(es, resp)
})

t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
Expand All @@ -195,7 +202,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})

t.Run("add-delete-requests", func(t *testing.T) {
Expand Down Expand Up @@ -230,7 +237,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)

validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})

// Wait until delete request is finished
Expand Down Expand Up @@ -264,7 +271,10 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests)))
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)

// ideally this metric should be equal to 1 given that a single line matches the line filter
// but the same chunk is indexed in 3 tables
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 3)
})

// Query lines
Expand All @@ -284,7 +294,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)

validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure
r.StorageConfig.AzureStorageConfig = r.Common.Storage.Azure
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.CompactorConfig.SharedStoreType = config.StorageTypeAzure
}
}

Expand All @@ -460,7 +459,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "local"
r.Ruler.StoreConfig.Local = local.Config{Directory: r.Common.Storage.FSConfig.RulesDirectory}
r.StorageConfig.FSConfig.Directory = r.Common.Storage.FSConfig.ChunksDirectory
r.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
}
}

Expand All @@ -471,7 +469,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "gcs"
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS
r.StorageConfig.GCSConfig = r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = config.StorageTypeGCS
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -483,7 +480,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "s3"
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3
r.StorageConfig.AWSStorageConfig.S3Config = r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = config.StorageTypeS3
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -494,7 +490,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "bos"
r.Ruler.StoreConfig.BOS = r.Common.Storage.BOS
r.StorageConfig.BOSStorageConfig = r.Common.Storage.BOS
r.CompactorConfig.SharedStoreType = config.StorageTypeBOS
}
}

Expand All @@ -505,7 +500,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "swift"
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift
r.StorageConfig.Swift = r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = config.StorageTypeSwift
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -522,8 +516,8 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}

func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
if cfg.CompactorConfig.DefaultDeleteRequestStore == defaults.CompactorConfig.DefaultDeleteRequestStore {
cfg.CompactorConfig.DefaultDeleteRequestStore = period.ObjectType
}

if cfg.Common.PathPrefix != "" {
Expand All @@ -540,8 +534,8 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.Per
}

func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
if cfg.CompactorConfig.DefaultDeleteRequestStore == defaults.CompactorConfig.DefaultDeleteRequestStore {
cfg.CompactorConfig.DefaultDeleteRequestStore = period.ObjectType
}

if cfg.Common.PathPrefix != "" {
Expand Down
68 changes: 0 additions & 68 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,58 +720,6 @@ storage_config:
assert.EqualValues(t, 5*time.Minute, config.StorageConfig.GCSConfig.RequestTimeout)
})

t.Run("when common object store config is provided, compactor shared store is defaulted to use it", func(t *testing.T) {
for _, tt := range []struct {
configString string
expected string
}{
{
configString: `common:
storage:
s3:
s3: s3://foo-bucket/example
access_key_id: abc123
secret_access_key: def789`,
expected: config.StorageTypeS3,
},
{
configString: `common:
storage:
gcs:
bucket_name: foobar`,
expected: config.StorageTypeGCS,
},
{
configString: `common:
storage:
azure:
account_name: 3rd_planet
account_key: water`,
expected: config.StorageTypeAzure,
},
{
configString: `common:
storage:
swift:
username: steve
password: supersecret`,
expected: config.StorageTypeSwift,
},
{
configString: `common:
storage:
filesystem:
chunks_directory: /tmp/chunks
rules_directory: /tmp/rules`,
expected: config.StorageTypeFileSystem,
},
} {
config, _ := testContext(tt.configString, nil)

assert.Equal(t, tt.expected, config.CompactorConfig.SharedStoreType)
}
})

t.Run("explicit compactor shared_store config is preserved", func(t *testing.T) {
configString := `common:
storage:
Expand All @@ -788,22 +736,6 @@ compactor:
})

t.Run("when using boltdb storage type", func(t *testing.T) {
t.Run("default compactor.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h`
cfg, _ := testContext(boltdbSchemaConfig, nil)

assert.Equal(t, config.StorageTypeGCS, cfg.CompactorConfig.SharedStoreType)
})

t.Run("shared store types provided via config file take precedence", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
Expand Down

0 comments on commit 52cd0a3

Please sign in to comment.