/
config.go
270 lines (235 loc) · 9.33 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package queryfrontend
import (
"strings"
"time"
cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
cortexvalidation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
extflag "github.com/efficientgo/tools/extkingpin"
prommodel "github.com/prometheus/common/model"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/model"
)
type ResponseCacheProvider string
const (
INMEMORY ResponseCacheProvider = "IN-MEMORY"
MEMCACHED ResponseCacheProvider = "MEMCACHED"
REDIS ResponseCacheProvider = "REDIS"
)
var (
defaultMemcachedConfig = MemcachedResponseCacheConfig{
Memcached: cacheutil.MemcachedClientConfig{
Timeout: 500 * time.Millisecond,
MaxIdleConnections: 100,
MaxAsyncConcurrency: 10,
MaxAsyncBufferSize: 10000,
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
MaxItemSize: model.Bytes(1024 * 1024),
DNSProviderUpdateInterval: 10 * time.Second,
},
Expiration: 24 * time.Hour,
}
// DefaultRedisConfig is default redis config for queryfrontend.
DefaultRedisConfig = RedisResponseCacheConfig{
Redis: cacheutil.DefaultRedisClientConfig,
Expiration: 24 * time.Hour,
}
)
// InMemoryResponseCacheConfig holds the configs for the in-memory cache provider.
type InMemoryResponseCacheConfig struct {
// MaxSize represents overall maximum number of bytes cache can contain.
MaxSize string `yaml:"max_size"`
// MaxSizeItems represents the maximum number of entries in the cache.
MaxSizeItems int `yaml:"max_size_items"`
// Validity represents the expiry duration for the cache.
Validity time.Duration `yaml:"validity"`
}
// MemcachedResponseCacheConfig holds the configs for the memcache cache provider.
type MemcachedResponseCacheConfig struct {
Memcached cacheutil.MemcachedClientConfig `yaml:",inline"`
// Expiration sets a global expiration limit for all cached items.
Expiration time.Duration `yaml:"expiration"`
}
// RedisResponseCacheConfig holds the configs for the redis cache provider.
type RedisResponseCacheConfig struct {
Redis cacheutil.RedisClientConfig `yaml:",inline"`
// Expiration sets a global expiration limit for all cached items.
Expiration time.Duration `yaml:"expiration"`
}
// CacheProviderConfig is the initial CacheProviderConfig struct holder before parsing it into a specific cache provider.
// Based on the config type the config is then parsed into a specific cache provider.
type CacheProviderConfig struct {
Type ResponseCacheProvider `yaml:"type"`
Config interface{} `yaml:"config"`
}
// NewCacheConfig is a parser that converts a Thanos cache config yaml into a cortex cache config struct.
func NewCacheConfig(logger log.Logger, confContentYaml []byte) (*cortexcache.Config, error) {
cacheConfig := &CacheProviderConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, cacheConfig); err != nil {
return nil, errors.Wrap(err, "parsing config YAML file")
}
backendConfig, err := yaml.Marshal(cacheConfig.Config)
if err != nil {
return nil, errors.Wrap(err, "marshal content of cache backend configuration")
}
switch strings.ToUpper(string(cacheConfig.Type)) {
case string(INMEMORY):
var config InMemoryResponseCacheConfig
if err := yaml.Unmarshal(backendConfig, &config); err != nil {
return nil, err
}
return &cortexcache.Config{
EnableFifoCache: true,
Fifocache: cortexcache.FifoCacheConfig{
MaxSizeBytes: config.MaxSize,
MaxSizeItems: config.MaxSizeItems,
Validity: config.Validity,
},
}, nil
case string(MEMCACHED):
config := defaultMemcachedConfig
if err := yaml.UnmarshalStrict(backendConfig, &config); err != nil {
return nil, err
}
if config.Expiration == 0 {
level.Warn(logger).Log("msg", "memcached cache valid time set to 0, so using a default of 24 hours expiration time")
config.Expiration = 24 * time.Hour
}
if config.Memcached.DNSProviderUpdateInterval <= 0 {
level.Warn(logger).Log("msg", "memcached dns provider update interval time set to invalid value, defaulting to 10s")
config.Memcached.DNSProviderUpdateInterval = 10 * time.Second
}
if config.Memcached.MaxAsyncConcurrency <= 0 {
level.Warn(logger).Log("msg", "memcached max async concurrency must be positive, defaulting to 10")
config.Memcached.MaxAsyncConcurrency = 10
}
return &cortexcache.Config{
Memcache: cortexcache.MemcachedConfig{
Expiration: config.Expiration,
Parallelism: config.Memcached.MaxGetMultiConcurrency,
BatchSize: config.Memcached.MaxGetMultiBatchSize,
},
MemcacheClient: cortexcache.MemcachedClientConfig{
Timeout: config.Memcached.Timeout,
MaxIdleConns: config.Memcached.MaxIdleConnections,
Addresses: strings.Join(config.Memcached.Addresses, ","),
UpdateInterval: config.Memcached.DNSProviderUpdateInterval,
MaxItemSize: int(config.Memcached.MaxItemSize),
},
Background: cortexcache.BackgroundConfig{
WriteBackBuffer: config.Memcached.MaxAsyncBufferSize,
WriteBackGoroutines: config.Memcached.MaxAsyncConcurrency,
},
}, nil
case string(REDIS):
config := DefaultRedisConfig
if err := yaml.UnmarshalStrict(backendConfig, &config); err != nil {
return nil, err
}
if config.Expiration <= 0 {
level.Warn(logger).Log("msg", "redis cache valid time set to 0, so using a default of 24 hours expiration time")
config.Expiration = 24 * time.Hour
}
return &cortexcache.Config{
Redis: cortexcache.RedisConfig{
Endpoint: config.Redis.Addr,
Timeout: config.Redis.ReadTimeout,
Expiration: config.Expiration,
DB: config.Redis.DB,
PoolSize: config.Redis.PoolSize,
Password: flagext.Secret{Value: config.Redis.Password},
IdleTimeout: config.Redis.IdleTimeout,
MaxConnAge: config.Redis.MaxConnAge,
},
Background: cortexcache.BackgroundConfig{
WriteBackBuffer: config.Redis.MaxSetMultiConcurrency * config.Redis.SetMultiBatchSize,
WriteBackGoroutines: config.Redis.MaxSetMultiConcurrency,
},
}, nil
default:
return nil, errors.Errorf("response cache with type %s is not supported", cacheConfig.Type)
}
}
// DownstreamTripperConfig stores the http.Transport configuration for query-frontend's HTTP downstream tripper.
type DownstreamTripperConfig struct {
IdleConnTimeout prommodel.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout prommodel.Duration `yaml:"response_header_timeout"`
TLSHandshakeTimeout prommodel.Duration `yaml:"tls_handshake_timeout"`
ExpectContinueTimeout prommodel.Duration `yaml:"expect_continue_timeout"`
MaxIdleConns *int `yaml:"max_idle_conns"`
MaxIdleConnsPerHost *int `yaml:"max_idle_conns_per_host"`
MaxConnsPerHost *int `yaml:"max_conns_per_host"`
CachePathOrContent extflag.PathOrContent
}
// Config holds the query frontend configs.
type Config struct {
QueryRangeConfig
LabelsConfig
DownstreamTripperConfig
CortexHandlerConfig *transport.HandlerConfig
CompressResponses bool
CacheCompression string
RequestLoggingDecision string
DownstreamURL string
ForwardHeaders []string
}
// QueryRangeConfig holds the config for query range tripperware.
type QueryRangeConfig struct {
// PartialResponseStrategy is the default strategy used
// when parsing thanos query request.
PartialResponseStrategy bool
ResultsCacheConfig *queryrange.ResultsCacheConfig
CachePathOrContent extflag.PathOrContent
AlignRangeWithStep bool
RequestDownsampled bool
SplitQueriesByInterval time.Duration
MaxRetries int
Limits *cortexvalidation.Limits
}
// LabelsConfig holds the config for labels tripperware.
type LabelsConfig struct {
// PartialResponseStrategy is the default strategy used
// when parsing thanos query request.
PartialResponseStrategy bool
DefaultTimeRange time.Duration
ResultsCacheConfig *queryrange.ResultsCacheConfig
CachePathOrContent extflag.PathOrContent
SplitQueriesByInterval time.Duration
MaxRetries int
Limits *cortexvalidation.Limits
}
// Validate a fully initialized config.
func (cfg *Config) Validate() error {
if cfg.QueryRangeConfig.ResultsCacheConfig != nil {
if cfg.QueryRangeConfig.SplitQueriesByInterval <= 0 {
return errors.New("split queries interval should be greater than 0 when caching is enabled")
}
if err := cfg.QueryRangeConfig.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config for query_range tripperware")
}
}
if cfg.LabelsConfig.ResultsCacheConfig != nil {
if cfg.LabelsConfig.SplitQueriesByInterval <= 0 {
return errors.New("split queries interval should be greater than 0 when caching is enabled")
}
if err := cfg.LabelsConfig.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config for labels tripperware")
}
}
if cfg.LabelsConfig.DefaultTimeRange == 0 {
return errors.New("labels.default-time-range cannot be set to 0")
}
if cfg.DownstreamURL == "" {
return errors.New("downstream URL should be configured")
}
return nil
}