Skip to content

Commit

Permalink
Merge pull request #10 from vinted/v0.28.0+vinted_update
Browse files Browse the repository at this point in the history
*: update fork
  • Loading branch information
GiedriusS committed Aug 10, 2022
2 parents 6670093 + 7211277 commit e3f8229
Show file tree
Hide file tree
Showing 12 changed files with 687 additions and 94 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,12 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan

We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Performance

### Added

- [#4290](https://github.com/thanos-io/thanos/pull/4290) proxy: coalesce multiple requests for the same data; greatly improves performance when opening a dashboard without query-frontend where there are a lot of different panels (queries) asking for the same data

## Unreleased
- [#5453](https://github.com/thanos-io/thanos/pull/5453) Compact: Skip erroneous empty non `*AggrChunk` chunks during 1h downsampling of 5m resolution blocks.

Expand Down
11 changes: 10 additions & 1 deletion go.mod
Expand Up @@ -89,7 +89,7 @@ require (
go.uber.org/automaxprocs v1.5.1
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e
golang.org/x/net v0.0.0-20220630215102-69896b714898
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
Expand All @@ -102,6 +102,10 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/viney-shih/go-cache v1.1.4

require github.com/rueian/rueidis v0.0.69

require (
cloud.google.com/go v0.102.0 // indirect
cloud.google.com/go/compute v1.7.0 // indirect
Expand Down Expand Up @@ -138,6 +142,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect
github.com/aws/smithy-go v1.11.1 // indirect
github.com/baidubce/bce-sdk-go v0.9.111 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/chromedp/sysutil v1.0.0 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
Expand Down Expand Up @@ -219,6 +224,7 @@ require (
github.com/tencentyun/cos-go-sdk-v5 v0.7.34 // indirect
github.com/tklauser/go-sysconf v0.3.4 // indirect
github.com/tklauser/numcpus v0.2.1 // indirect
github.com/vmihailenco/go-tinylfu v0.2.2 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.elastic.co/apm/module/apmhttp v1.11.0 // indirect
Expand All @@ -229,6 +235,7 @@ require (
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/exp v0.0.0-20210526181343-b47a03e3048a // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/oauth2 v0.0.0-20220628200809-02e64fa58f26 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
Expand Down Expand Up @@ -260,3 +267,5 @@ replace (
)

go 1.17

replace github.com/prometheus/prometheus => github.com/vinted/prometheus v1.8.2-0.20220808145920-5c879a061105
62 changes: 37 additions & 25 deletions go.sum

Large diffs are not rendered by default.

56 changes: 53 additions & 3 deletions pkg/cacheutil/memcached_client.go
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/viney-shih/go-cache"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/store/cache/cachekey"
)

const (
Expand Down Expand Up @@ -179,6 +181,8 @@ type memcachedClient struct {
client memcachedClientBackend
selector updatableServerSelector

tLFU cache.Adapter

// Name provides an identifier for the instantiated Client
name string

Expand Down Expand Up @@ -275,7 +279,11 @@ func newMemcachedClient(
)
}

// 16KB (max) * 100000 = 1600MB.
tinyLfu := cache.NewTinyLFU(100000)

c := &memcachedClient{
tLFU: tinyLfu,
logger: log.With(logger, "name", name),
config: config,
client: client,
Expand Down Expand Up @@ -383,13 +391,17 @@ func (c *memcachedClient) Stop() {
c.workers.Wait()
}

func (c *memcachedClient) SetAsync(_ context.Context, key string, value []byte, ttl time.Duration) error {
func (c *memcachedClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
// Skip hitting memcached at all if the item is bigger than the max allowed size.
if c.config.MaxItemSize > 0 && uint64(len(value)) > uint64(c.config.MaxItemSize) {
c.skipped.WithLabelValues(opSet, reasonMaxItemSize).Inc()
return nil
}

if err := c.tLFU.MSet(ctx, map[string][]byte{key: value}, ttl); err != nil {
level.Warn(c.logger).Log("msg", "failed to put item in tinyLFU", "key", key, "err", err.Error())
}

err := c.enqueueAsync(func() {
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()
Expand Down Expand Up @@ -431,6 +443,34 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
return nil
}

hits := map[string][]byte{}

// Check if they exist in memory.
values, err := c.tLFU.MGet(ctx, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to fetch items from tinyLFU", "err", err)
} else {
for i, v := range values {
if !v.Valid {
continue
}
hits[keys[i]] = v.Bytes
}
if len(hits) == len(keys) {
return hits
}

newKeys := make([]string, 0, len(keys)-len(hits))
for i, v := range values {
if v.Valid {
continue
}
newKeys = append(newKeys, keys[i])
}
keys = newKeys
}

// Ping memcached.
batches, err := c.getMultiBatched(ctx, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to fetch items from memcached", "numKeys", len(keys), "firstKey", keys[0], "err", err)
Expand All @@ -444,13 +484,23 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
}
}

hits := map[string][]byte{}
memcachedHits := map[string][]byte{}
for _, items := range batches {
for key, item := range items {
hits[key] = item.Value
memcachedHits[key] = item.Value
}
}

// TODO: change this to be dynamic. 5 minutes because our current minimum TTL is 5 minutes in /opt/thanos/store_cachingbucket.yml.
ttl := 5 * time.Minute
if strings.HasPrefix(keys[0], string(cachekey.SubrangeVerb)) {
ttl = 24 * time.Hour

}
if err := c.tLFU.MSet(ctx, memcachedHits, ttl); err != nil {
level.Warn(c.logger).Log("msg", "failed to set items in tinyLFU", "numKeys", len(keys), "firstKey", keys[0], "err", err)
}

return hits
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/cacheutil/redis_client.go
Expand Up @@ -43,6 +43,8 @@ type RedisClientConfig struct {
// Addr specifies the addresses of redis server.
Addr string `yaml:"addr"`

Addrs []string `yaml:"addrs"`

// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Expand All @@ -56,6 +58,8 @@ type RedisClientConfig struct {
// DB Database to be selected after connecting to the server.
DB int `yaml:"db"`

CacheSize int `yaml:"cachesize"`

// DialTimeout specifies the client dial timeout.
DialTimeout time.Duration `yaml:"dial_timeout"`

Expand Down Expand Up @@ -97,7 +101,7 @@ type RedisClientConfig struct {
}

func (c *RedisClientConfig) validate() error {
if c.Addr == "" {
if c.Addr == "" && len(c.Addrs) == 0 {
return errors.New("no redis addr provided")
}
return nil
Expand Down
157 changes: 157 additions & 0 deletions pkg/cacheutil/rueidis_client.go
@@ -0,0 +1,157 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cacheutil

import (
"context"
"net"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rueian/rueidis"
)

// RueidisClient is a wrap of rueidis.Client.
type RueidisClient struct {
client rueidis.Client
config RedisClientConfig

logger log.Logger
durationSet prometheus.Observer
durationSetMulti prometheus.Observer
durationGetMulti prometheus.Observer
}

// NewRueidisClient makes a new RueidisClient.
func NewRueidisClient(logger log.Logger, name string, conf []byte, reg prometheus.Registerer) (*RueidisClient, error) {
config, err := parseRedisClientConfig(conf)
if err != nil {
return nil, err
}

return NewRueidisClientWithConfig(logger, name, config, reg)
}

// NewRueidisClientWithConfig makes a new RedisClient.
func NewRueidisClientWithConfig(logger log.Logger, name string, config RedisClientConfig,
reg prometheus.Registerer) (*RueidisClient, error) {

if err := config.validate(); err != nil {
return nil, err
}
var addrs []string

if len(config.Addrs) > 0 {
addrs = config.Addrs
} else {
addrs = []string{config.Addr}
}

var cacheSize int

if config.CacheSize != 0 {
cacheSize = config.CacheSize
} else {
cacheSize = 1024 * (1 << 20)
}

client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: addrs,
ShuffleInit: true,
Username: config.Username,
Password: config.Password,
SelectDB: config.DB,
CacheSizeEachConn: cacheSize,
Dialer: net.Dialer{Timeout: config.DialTimeout},
ConnWriteTimeout: config.WriteTimeout,
})
if err != nil {
return nil, err
}

if reg != nil {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg)
}

c := &RueidisClient{
client: client,
config: config,
logger: logger,
}
duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_redis_operation_duration_seconds",
Help: "Duration of operations against redis.",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 3, 6, 10},
}, []string{"operation"})
c.durationSet = duration.WithLabelValues(opSet)
c.durationSetMulti = duration.WithLabelValues(opSetMulti)
c.durationGetMulti = duration.WithLabelValues(opGetMulti)
return c, nil
}

// SetAsync implement RemoteCacheClient.
func (c *RueidisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
start := time.Now()
if err := c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
return nil
}
c.durationSet.Observe(time.Since(start).Seconds())
return nil
}

// SetMulti set multiple keys and value.
func (c *RueidisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl time.Duration) {
if len(data) == 0 {
return
}
start := time.Now()
sets := make(rueidis.Commands, 0, len(data))
ittl := int64(ttl.Seconds())
for k, v := range data {
sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build())
}
for _, resp := range c.client.DoMulti(ctx, sets...) {
if err := resp.Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data))
return
}
}
c.durationSetMulti.Observe(time.Since(start).Seconds())
}

// GetMulti implement RemoteCacheClient.
func (c *RueidisClient) GetMulti(ctx context.Context, keys []string) map[string][]byte {
if len(keys) == 0 {
return nil
}
start := time.Now()
results := make(map[string][]byte, len(keys))

if c.config.ReadTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, c.config.ReadTimeout)
defer cancel()
ctx = timeoutCtx
}

resps, err := rueidis.MGetCache(c.client, ctx, 8*time.Hour, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resps))
}
for key, resp := range resps {
if val, err := resp.ToString(); err == nil {
results[key] = stringToBytes(val)
}
}
c.durationGetMulti.Observe(time.Since(start).Seconds())
return results
}

// Stop implement RemoteCacheClient.
func (c *RueidisClient) Stop() {
c.client.Close()
}

0 comments on commit e3f8229

Please sign in to comment.