From 0be191309cae301baba203eeb57d80781f5f1615 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 10 Oct 2023 19:28:20 +0530 Subject: [PATCH] config: better defaults for querier max concurrency (#10785) **What this PR does / why we need it**: To simply loki configuration, this pr removes `querier.worker-parallelism` as it does not offer additional value to already existing `querier.max-concurrent` Also updates the default value of `querier.max-concurrent` to 4. This is a better default for users that are just starting out. Users can consider increasing it as they scale up their loki installation. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [X] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [X] Documentation added - [X] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [X] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 15 ++----- docs/sources/setup/upgrade/_index.md | 18 ++++---- pkg/loki/modules.go | 5 +-- pkg/lokifrontend/frontend/v1/frontend_test.go | 4 +- pkg/querier/querier.go | 2 +- pkg/querier/worker/worker.go | 41 ++++++------------- pkg/querier/worker/worker_test.go | 24 +++-------- pkg/querier/worker_service.go | 1 - 9 files changed, 37 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d0ed9fc8c78..0423b302991c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [10709](https://github.com/grafana/loki/pull/10709) **chaudum**/**salvacorts** Remove `ingester.max-transfer-retries` configuration option in favor of using the WAL. * [10736](https://github.com/grafana/loki/pull/10736) **ashwanthgoli** Deprecate write dedupe cache as this is not required by the newer single store indexes (tsdb and boltdb-shipper). * [10693](https://github.com/grafana/loki/pull/10693) **ashwanthgoli** Embedded cache: Updates the metric prefix from `querier_cache_` to `loki_embeddedcache_` and removes duplicate metrics. +* [10785](https://github.com/grafana/loki/pull/10785) **ashwanthgoli** Config: Removes `querier.worker-parallelism` and updates default value of `querier.max-concurrent` to 4. ##### Fixes diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index a992f23198a2..24ba529e201a 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -544,9 +544,10 @@ engine: # CLI flag: -querier.engine.max-lookback-period [max_look_back_period: | default = 30s] -# The maximum number of concurrent queries allowed. +# The maximum number of queries that can be simultaneously processed by the +# querier. # CLI flag: -querier.max-concurrent -[max_concurrent: | default = 10] +[max_concurrent: | default = 4] # Only query the store, and not attempt any ingesters. This is useful for # running a standalone querier pool operating only against stored data. @@ -2759,16 +2760,6 @@ The `frontend_worker` configures the worker - running within the Loki querier - # CLI flag: -querier.dns-lookup-period [dns_lookup_duration: | default = 3s] -# Number of simultaneous queries to process per query-frontend or -# query-scheduler. -# CLI flag: -querier.worker-parallelism -[parallelism: | default = 10] - -# Force worker concurrency to match the -querier.max-concurrent option. -# Overrides querier.worker-parallelism. -# CLI flag: -querier.worker-match-max-concurrent -[match_max_concurrent: | default = true] - # Querier ID, sent to frontend service to identify requests from the same # querier. Defaults to hostname. # CLI flag: -querier.id diff --git a/docs/sources/setup/upgrade/_index.md b/docs/sources/setup/upgrade/_index.md index 9268676d7e65..36371dbda679 100644 --- a/docs/sources/setup/upgrade/_index.md +++ b/docs/sources/setup/upgrade/_index.md @@ -48,13 +48,15 @@ The previous default value `false` is applied. #### Deprecated configuration options are removed 1. Removes already deprecated `-querier.engine.timeout` CLI flag and the corresponding YAML setting. -2. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config). -3. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type. -4. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead. -5. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead. -6. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config). -7. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed. -8. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`. +1. Also removes the `query_timeout` from the querier YAML section. Instead of configuring `query_timeout` under `querier`, you now configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config). +1. `s3.sse-encryption` is removed. AWS now defaults encryption of all buckets to SSE-S3. Use `sse.type` to set SSE type. +1. `ruler.wal-cleaer.period` is removed. Use `ruler.wal-cleaner.period` instead. +1. `experimental.ruler.enable-api` is removed. Use `ruler.enable-api` instead. +1. `split_queries_by_interval` is removed from `query_range` YAML section. You can instead configure it in [Limits Config](/docs/loki/latest/configuration/#limits_config). +1. `frontend.forward-headers-list` CLI flag and its corresponding YAML setting are removed. +1. `frontend.cache-split-interval` CLI flag is removed. Results caching interval is now determined by `querier.split-queries-by-interval`. +1. `querier.worker-parallelism` CLI flag and its corresponding yaml setting are now removed as it does not offer additional value to already existing `querier.max-concurrent`. + We recommend configuring `querier.max-concurrent` to limit the max concurrent requests processed by the queriers. #### Legacy ingester shutdown handler is removed @@ -76,6 +78,8 @@ This new metric will provide a more clear signal that there is an issue with ing #### Changes to default configuration values +1. `querier.max-concurrent` now defaults to 4. Consider increasing this if queriers have access to more CPU resources. + Note that you risk running into out of memory errors if you set this to a very high value. 1. `frontend.embedded-cache.max-size-mb` Embedded results cache size now defaults to 100MB. #### Write dedupe cache is deprecated diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a1deffd1029a..9efef05a78fc 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -332,8 +332,8 @@ func (t *Loki) initQuerier() (services.Service, error) { if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod } - // Querier worker's max concurrent requests must be the same as the querier setting - t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent + // Querier worker's max concurrent must be the same as the querier setting + t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent deleteStore, err := t.deleteRequestsClient("querier", t.Overrides) if err != nil { @@ -357,7 +357,6 @@ func (t *Loki) initQuerier() (services.Service, error) { ReadEnabled: t.Cfg.isModuleEnabled(Read), GrpcListenAddress: t.Cfg.Server.GRPCListenAddress, GrpcListenPort: t.Cfg.Server.GRPCListenPort, - QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent, QuerierWorkerConfig: &t.Cfg.Worker, QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), diff --git a/pkg/lokifrontend/frontend/v1/frontend_test.go b/pkg/lokifrontend/frontend/v1/frontend_test.go index 2fc3712ba326..1ac564eb3404 100644 --- a/pkg/lokifrontend/frontend/v1/frontend_test.go +++ b/pkg/lokifrontend/frontend/v1/frontend_test.go @@ -227,9 +227,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a var workerConfig querier_worker.Config flagext.DefaultValues(&workerConfig) - workerConfig.Parallelism = 1 - workerConfig.MatchMaxConcurrency = matchMaxConcurrency - workerConfig.MaxConcurrentRequests = 1 + workerConfig.MaxConcurrent = 1 // localhost:0 prevents firewall warnings on Mac OS X. grpcListen, err := net.Listen("tcp", "localhost:0") diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 870e5b474ab3..f2333415e6e4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -67,7 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Maximum duration for which the live tailing requests are served.") f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") - f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 10, "The maximum number of concurrent queries allowed.") + f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.") f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Only query the store, and not attempt any ingesters. This is useful for running a standalone querier pool operating only against stored data.") f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.") f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.") diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index a6f3824346b7..7c88ac92b816 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -25,10 +25,7 @@ type Config struct { FrontendAddress string `yaml:"frontend_address"` SchedulerAddress string `yaml:"scheduler_address"` DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration"` - - Parallelism int `yaml:"parallelism"` - MatchMaxConcurrency bool `yaml:"match_max_concurrent"` - MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to LogQL Engine. + MaxConcurrent int `yaml:"-"` // same as querier.max-concurrent. QuerierID string `yaml:"id"` @@ -38,11 +35,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") - f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.") - - f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query-frontend or query-scheduler.") - f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", true, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.") f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -235,27 +228,23 @@ func (w *querierWorker) AddressRemoved(address string) { // Must be called with lock. func (w *querierWorker) resetConcurrency() { - totalConcurrency := 0 + var ( + index, totalConcurrency int + ) + defer func() { w.metrics.concurrentWorkers.Set(float64(totalConcurrency)) }() - index := 0 for _, m := range w.managers { - concurrency := 0 - - if w.cfg.MatchMaxConcurrency { - concurrency = w.cfg.MaxConcurrentRequests / len(w.managers) - - // If max concurrency does not evenly divide into our frontends a subset will be chosen - // to receive an extra connection. Frontend addresses were shuffled above so this will be a - // random selection of frontends. - if index < w.cfg.MaxConcurrentRequests%len(w.managers) { - level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address) - concurrency++ - } - } else { - concurrency = w.cfg.Parallelism + concurrency := w.cfg.MaxConcurrent / len(w.managers) + + // If max concurrency does not evenly divide into our frontends a subset will be chosen + // to receive an extra connection. Frontend addresses were shuffled above so this will be a + // random selection of frontends. + if index < w.cfg.MaxConcurrent%len(w.managers) { + level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address) + concurrency++ } // If concurrency is 0 then MaxConcurrentRequests is less than the total number of @@ -270,10 +259,6 @@ func (w *querierWorker) resetConcurrency() { m.concurrency(concurrency) index++ } - - if totalConcurrency > w.cfg.MaxConcurrentRequests { - level.Warn(w.logger).Log("msg", "total worker concurrency is greater than logql max concurrency. Queries may be queued in the querier which reduces QOS") - } } func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index b81856d7d3f9..2f1ccb98d309 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -18,42 +18,30 @@ import ( func TestResetConcurrency(t *testing.T) { tests := []struct { name string - parallelism int maxConcurrent int numTargets int expectedConcurrency int }{ { - name: "Test create at least one processor per target", - parallelism: 0, + name: "create at least one processor per target", maxConcurrent: 0, numTargets: 2, expectedConcurrency: 2, }, { - name: "Test concurrency equal to parallelism * target when MatchMaxConcurrency is false", - parallelism: 4, - maxConcurrent: 0, - numTargets: 2, - expectedConcurrency: 8, - }, - { - name: "Test concurrency is correct when numTargets does not divide evenly into maxConcurrent", - parallelism: 1, + name: "concurrency is correct when numTargets does not divide evenly into maxConcurrent", maxConcurrent: 7, numTargets: 4, expectedConcurrency: 7, }, { - name: "Test Total Parallelism dividing evenly", - parallelism: 1, + name: "total concurrency dividing evenly", maxConcurrent: 6, numTargets: 2, expectedConcurrency: 6, }, { - name: "Test Total Parallelism at least one worker per target", - parallelism: 1, + name: "total concurrency at least one processor per target", maxConcurrent: 3, numTargets: 6, expectedConcurrency: 6, @@ -63,9 +51,7 @@ func TestResetConcurrency(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := Config{ - Parallelism: tt.parallelism, - MatchMaxConcurrency: tt.maxConcurrent > 0, - MaxConcurrentRequests: tt.maxConcurrent, + MaxConcurrent: tt.maxConcurrent, } w, err := newQuerierWorkerWithProcessor(cfg, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil) diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 6e174522ac8b..7565c8f3cb42 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -25,7 +25,6 @@ type WorkerServiceConfig struct { ReadEnabled bool GrpcListenAddress string GrpcListenPort int - QuerierMaxConcurrent int QuerierWorkerConfig *querier_worker.Config QueryFrontendEnabled bool QuerySchedulerEnabled bool