Skip to content

Commit

Permalink
config: better defaults for querier max concurrency (#10785)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
To simply loki configuration, this pr removes
`querier.worker-parallelism` as it does not offer additional value to
already existing `querier.max-concurrent`

Also updates the default value of `querier.max-concurrent` to 4. This is
a better default for users that are just starting out. Users can
consider increasing it as they scale up their loki installation.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [X] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [X] Documentation added
- [X] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [X] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
ashwanthgoli committed Oct 10, 2023
1 parent 1715e64 commit 0be1913
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -23,6 +23,7 @@
* [10709](https://github.com/grafana/loki/pull/10709) **chaudum**/**salvacorts** Remove `ingester.max-transfer-retries` configuration option in favor of using the WAL.
* [10736](https://github.com/grafana/loki/pull/10736) **ashwanthgoli** Deprecate write dedupe cache as this is not required by the newer single store indexes (tsdb and boltdb-shipper).
* [10693](https://github.com/grafana/loki/pull/10693) **ashwanthgoli** Embedded cache: Updates the metric prefix from `querier_cache_` to `loki_embeddedcache_` and removes duplicate metrics.
* [10785](https://github.com/grafana/loki/pull/10785) **ashwanthgoli** Config: Removes `querier.worker-parallelism` and updates default value of `querier.max-concurrent` to 4.

##### Fixes

Expand Down
15 changes: 3 additions & 12 deletions docs/sources/configure/_index.md
Expand Up @@ -544,9 +544,10 @@ engine:
# CLI flag: -querier.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of concurrent queries allowed.
# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier.max-concurrent
[max_concurrent: <int> | default = 10]
[max_concurrent: <int> | default = 4]

# Only query the store, and not attempt any ingesters. This is useful for
# running a standalone querier pool operating only against stored data.
Expand Down Expand Up @@ -2759,16 +2760,6 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 3s]

# Number of simultaneous queries to process per query-frontend or
# query-scheduler.
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]

# Force worker concurrency to match the -querier.max-concurrent option.
# Overrides querier.worker-parallelism.
# CLI flag: -querier.worker-match-max-concurrent
[match_max_concurrent: <boolean> | default = true]

# Querier ID, sent to frontend service to identify requests from the same
# querier. Defaults to hostname.
# CLI flag: -querier.id
Expand Down
18 changes: 11 additions & 7 deletions docs/sources/setup/upgrade/_index.md
Expand Up @@ -48,13 +48,15 @@ The previous default value `false` is applied.
#### Deprecated configuration options are removed

1. Removes already deprecated `-querier.engine.timeout` CLI flag and the corresponding YAML setting.
2. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
3. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type.
4. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead.
5. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead.
6. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
7. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed.
8. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`.
1. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
1. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type.
1. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead.
1. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead.
1. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config).
1. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed.
1. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`.
1. `querier.worker-parallelism` CLI flag and its corresponding yaml setting are now removed as it does not offer additional value to already existing `querier.max-concurrent`.
We recommend configuring `querier.max-concurrent` to limit the max concurrent requests processed by the queriers.

#### Legacy ingester shutdown handler is removed

Expand All @@ -76,6 +78,8 @@ This new metric will provide a more clear signal that there is an issue with ing

#### Changes to default configuration values

1. `querier.max-concurrent` now defaults to 4. Consider increasing this if queriers have access to more CPU resources.
Note that you risk running into out of memory errors if you set this to a very high value.
1. `frontend.embedded-cache.max-size-mb` Embedded results cache size now defaults to 100MB.

#### Write dedupe cache is deprecated
Expand Down
5 changes: 2 additions & 3 deletions pkg/loki/modules.go
Expand Up @@ -332,8 +332,8 @@ func (t *Loki) initQuerier() (services.Service, error) {
if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
}
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
// Querier worker's max concurrent must be the same as the querier setting
t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsClient("querier", t.Overrides)
if err != nil {
Expand All @@ -357,7 +357,6 @@ func (t *Loki) initQuerier() (services.Service, error) {
ReadEnabled: t.Cfg.isModuleEnabled(Read),
GrpcListenAddress: t.Cfg.Server.GRPCListenAddress,
GrpcListenPort: t.Cfg.Server.GRPCListenPort,
QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent,
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
Expand Down
4 changes: 1 addition & 3 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Expand Up @@ -227,9 +227,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a

var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
workerConfig.MaxConcurrentRequests = 1
workerConfig.MaxConcurrent = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Expand Up @@ -67,7 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Maximum duration for which the live tailing requests are served.")
f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 10, "The maximum number of concurrent queries allowed.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.")
f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Only query the store, and not attempt any ingesters. This is useful for running a standalone querier pool operating only against stored data.")
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
Expand Down
41 changes: 13 additions & 28 deletions pkg/querier/worker/worker.go
Expand Up @@ -25,10 +25,7 @@ type Config struct {
FrontendAddress string `yaml:"frontend_address"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration"`

Parallelism int `yaml:"parallelism"`
MatchMaxConcurrency bool `yaml:"match_max_concurrent"`
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to LogQL Engine.
MaxConcurrent int `yaml:"-"` // same as querier.max-concurrent.

QuerierID string `yaml:"id"`

Expand All @@ -38,11 +35,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")

f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")

f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query-frontend or query-scheduler.")
f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", true, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
Expand Down Expand Up @@ -235,27 +228,23 @@ func (w *querierWorker) AddressRemoved(address string) {

// Must be called with lock.
func (w *querierWorker) resetConcurrency() {
totalConcurrency := 0
var (
index, totalConcurrency int
)

defer func() {
w.metrics.concurrentWorkers.Set(float64(totalConcurrency))
}()
index := 0

for _, m := range w.managers {
concurrency := 0

if w.cfg.MatchMaxConcurrency {
concurrency = w.cfg.MaxConcurrentRequests / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}
} else {
concurrency = w.cfg.Parallelism
concurrency := w.cfg.MaxConcurrent / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrent%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}

// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
Expand All @@ -270,10 +259,6 @@ func (w *querierWorker) resetConcurrency() {
m.concurrency(concurrency)
index++
}

if totalConcurrency > w.cfg.MaxConcurrentRequests {
level.Warn(w.logger).Log("msg", "total worker concurrency is greater than logql max concurrency. Queries may be queued in the querier which reduces QOS")
}
}

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
Expand Down
24 changes: 5 additions & 19 deletions pkg/querier/worker/worker_test.go
Expand Up @@ -18,42 +18,30 @@ import (
func TestResetConcurrency(t *testing.T) {
tests := []struct {
name string
parallelism int
maxConcurrent int
numTargets int
expectedConcurrency int
}{
{
name: "Test create at least one processor per target",
parallelism: 0,
name: "create at least one processor per target",
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 2,
},
{
name: "Test concurrency equal to parallelism * target when MatchMaxConcurrency is false",
parallelism: 4,
maxConcurrent: 0,
numTargets: 2,
expectedConcurrency: 8,
},
{
name: "Test concurrency is correct when numTargets does not divide evenly into maxConcurrent",
parallelism: 1,
name: "concurrency is correct when numTargets does not divide evenly into maxConcurrent",
maxConcurrent: 7,
numTargets: 4,
expectedConcurrency: 7,
},
{
name: "Test Total Parallelism dividing evenly",
parallelism: 1,
name: "total concurrency dividing evenly",
maxConcurrent: 6,
numTargets: 2,
expectedConcurrency: 6,
},
{
name: "Test Total Parallelism at least one worker per target",
parallelism: 1,
name: "total concurrency at least one processor per target",
maxConcurrent: 3,
numTargets: 6,
expectedConcurrency: 6,
Expand All @@ -63,9 +51,7 @@ func TestResetConcurrency(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := Config{
Parallelism: tt.parallelism,
MatchMaxConcurrency: tt.maxConcurrent > 0,
MaxConcurrentRequests: tt.maxConcurrent,
MaxConcurrent: tt.maxConcurrent,
}

w, err := newQuerierWorkerWithProcessor(cfg, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil)
Expand Down
1 change: 0 additions & 1 deletion pkg/querier/worker_service.go
Expand Up @@ -25,7 +25,6 @@ type WorkerServiceConfig struct {
ReadEnabled bool
GrpcListenAddress string
GrpcListenPort int
QuerierMaxConcurrent int
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
Expand Down

0 comments on commit 0be1913

Please sign in to comment.