Skip to content

Commit

Permalink
index-shipper: add support for multiple stores (#7754)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>

**What this PR does / why we need it**:
Currently loki initializes a single instance of index-shipper to [handle
all the table
ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188)
(from across periods) for a given index type `boltdb-shipper, tsdb`.
Since index-shipper only has the object client handle to the store
defined by `shared_store_type`, it limits the index uploads to a single
store. Setting `shared_store_type` to a different store at a later point
in time would mean losing access to the indexes stored in the previously
configured store.

With this PR, we initialize a separate index-shipper & table manager for
each period if `shared_store_type` is not explicity configured. This
offers the flexibility to store index in multiple stores (across
providers).

**Note**:
- usage of `shared_store_type` in this commit text refers to one of
these config options depending on the index in use:
`-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`
- `shared_store_type` used to default to the `object_store` from the
latest `period_config` if not explicitly configured. This PR removes
these defaults in favor of supporting index uploads to multiple stores.

**Which issue(s) this PR fixes**:
Fixes #7276

**Special notes for your reviewer**:
All the instances of downloads table manager operate on the same
cacheDir. But it shouldn't be a problem as the tableRanges do not
overlap across periods.

**Checklist**
- [X] Reviewed the `CONTRIBUTING.md` guide
- [ ] Documentation added
- [X] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

---------

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
Co-authored-by: J Stickler <julie.stickler@grafana.com>
  • Loading branch information
ashwanthgoli and JStickler committed Apr 14, 2023
1 parent c4261b1 commit 5cef03d
Show file tree
Hide file tree
Showing 34 changed files with 1,497 additions and 675 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* [8271](https://github.com/grafana/loki/pull/8271) **kavirajk**: logql: Support urlencode and urldecode template functions
* [8259](https://github.com/grafana/loki/pull/8259) **mar4uk**: Extract push.proto from the logproto package to the separate module.
* [7906](https://github.com/grafana/loki/pull/7906) **kavirajk**: Add API endpoint that formats LogQL expressions and support new `fmt` subcommand in `logcli` to format LogQL query.
* [7754](https://github.com/grafana/loki/pull/7754) **ashwanthgoli** index-shipper: add support for multiple stores.
* [6675](https://github.com/grafana/loki/pull/6675) **btaani**: Add logfmt expression parser for selective extraction of labels from logfmt formatted logs
* [8474](https://github.com/grafana/loki/pull/8474) **farodin91**: Add support for short-lived S3 session tokens
* [8774](https://github.com/grafana/loki/pull/8774) **slim-bean**: Add new logql template functions `bytes`, `duration`, `unixEpochMillis`, `unixEpochNanos`, `toDateInZone`, `b64Enc`, and `b64Dec`
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ level=info ts=2022-12-20T15:27:54.858554127Z caller=metrics.go:147 component=fro

These statistics are also displayed when using `--stats` with LogCLI.

#### Index shipper multi-store support
In releases prior to 2.8.1, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
In releases 2.8.1 and later, these defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

## 2.7.0

### Loki
Expand Down
2 changes: 1 addition & 1 deletion integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
func (c *Client) rangeQueryURL(query string) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(c.Now.Add(-2*time.Hour)))
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))

u, err := url.Parse(c.baseURL)
Expand Down
65 changes: 50 additions & 15 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/integration/util"

"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -60,22 +62,19 @@ limits_config:
per_stream_rate_limit_burst: 50MB
ingestion_rate_mb: 50
ingestion_burst_size_mb: 50
reject_old_samples: false
storage_config:
named_stores:
filesystem:
store-1:
directory: {{.sharedDataPath}}/fs-store-1
boltdb_shipper:
shared_store: filesystem
active_index_directory: {{.dataPath}}/index
cache_location: {{.dataPath}}/boltdb-cache
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
tsdb_shipper:
active_index_directory: {{.dataPath}}/tsdb-index
cache_location: {{.dataPath}}/tsdb-cache
compactor:
working_directory: {{.dataPath}}/retention
Expand All @@ -92,6 +91,9 @@ ingester:
querier:
multi_tenant_queries_enabled: true
query_scheduler:
max_outstanding_requests_per_tenant: 2048
ruler:
enable_api: true
ring:
Expand All @@ -104,7 +106,6 @@ ruler:
local:
directory: {{.sharedDataPath}}/rules
rule_path: {{.sharedDataPath}}/prom-rule
`))
)

Expand Down Expand Up @@ -139,12 +140,14 @@ func (w *wrappedRegisterer) MustRegister(collectors ...prometheus.Collector) {

type Cluster struct {
sharedPath string
overridesFile string
components []*Component
waitGroup sync.WaitGroup
initedAt model.Time
periodCfgs []string
overridesFile string
}

func New(logLevel level.Value) *Cluster {
func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
if logLevel != nil {
util_log.Logger = level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.Allow(logLevel))
}
Expand All @@ -162,10 +165,17 @@ func New(logLevel level.Value) *Cluster {
panic(fmt.Errorf("error creating overrides file: %w", err))
}

return &Cluster{
cluster := &Cluster{
sharedPath: sharedPath,
initedAt: model.Now(),
overridesFile: overridesFile,
}

for _, opt := range opts {
opt(cluster)
}

return cluster
}

func (c *Cluster) Run() error {
Expand Down Expand Up @@ -266,6 +276,11 @@ func (c *Component) ClusterSharedPath() string {
return c.cluster.sharedPath
}

// component should be restarted if it's already running for the new flags to take effect
func (c *Component) AddFlags(flags ...string) {
c.flags = append(c.flags, flags...)
}

func (c *Component) HTTPURL() string {
return fmt.Sprintf("http://localhost:%s", port(c.loki.Server.HTTPListenAddr().String()))
}
Expand Down Expand Up @@ -320,6 +335,9 @@ func (c *Component) writeConfig() error {
func (c *Component) MergedConfig() ([]byte, error) {
var sb bytes.Buffer

periodStart := config.DayTime{Time: c.cluster.initedAt.Add(-24 * time.Hour)}
additionalPeriodStart := config.DayTime{Time: c.cluster.initedAt.Add(-7 * 24 * time.Hour)}

if err := configTemplate.Execute(&sb, map[string]interface{}{
"dataPath": c.dataPath,
"sharedDataPath": c.cluster.sharedPath,
Expand All @@ -330,6 +348,23 @@ func (c *Component) MergedConfig() ([]byte, error) {
merger := util.NewYAMLMerger()
merger.AddFragment(sb.Bytes())

// default to using boltdb index
if len(c.cluster.periodCfgs) == 0 {
c.cluster.periodCfgs = []string{boltDBShipperSchemaConfigTemplate}
}

for _, periodCfg := range c.cluster.periodCfgs {
var buf bytes.Buffer
if err := template.Must(template.New("schema").Parse(periodCfg)).
Execute(&buf, map[string]interface{}{
"curPeriodStart": periodStart.String(),
"additionalPeriodStart": additionalPeriodStart.String(),
}); err != nil {
return nil, errors.New("error building schema_config")
}
merger.AddFragment(buf.Bytes())
}

for _, extra := range c.extraConfigs {
merger.AddFragment([]byte(extra))
}
Expand Down
61 changes: 61 additions & 0 deletions integration/cluster/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cluster

var (
boltDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.curPeriodStart}}
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
`
additionalBoltDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.additionalPeriodStart}}
store: boltdb-shipper
object_store: store-1
schema: v11
index:
prefix: index_
period: 24h
`

tsdbShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.curPeriodStart}}
store: tsdb
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
`
additionalTSDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.additionalPeriodStart}}
store: tsdb
object_store: store-1
schema: v11
index:
prefix: index_
period: 24h
`
)

func WithAdditionalBoltDBPeriod(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, boltDBShipperSchemaConfigTemplate)
}

func WithAdditionalTSDBPeriod(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalTSDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}

func WithBoltDBAndTSDBPeriods(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}
10 changes: 5 additions & 5 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
)

func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
}()

// initially, run only compactor, index-gateway and distributor.
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 5)

// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
})

Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// Query lines
t.Run("verify query time filtering", func(t *testing.T) {
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())

// update expectedStreams as per the issued requests
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
require.NoError(t, tQuerier.SetTenantLimits(tenantID, tenantLimits))

// restart querier to make it sync the index
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())

// ensure the deletion-mode limit is updated
Expand Down

0 comments on commit 5cef03d

Please sign in to comment.