Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent hashing #5408

Merged
merged 3 commits into from Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.

### Changed

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand Down Expand Up @@ -426,7 +427,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -437,7 +438,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -718,6 +719,7 @@ type receiveConfig struct {

hashringsFilePath string
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
Expand Down Expand Up @@ -779,6 +781,11 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext).
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Expand Up @@ -126,6 +126,9 @@ Flags:
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-algorithm=hashmod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of sad our tooling does not put those values in help. Do you mean explicitly adding those options for people to know how to use it?

The algorithm used when distributing series in
the hashrings. Must be one of hashmod, ketama
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Expand Up @@ -101,6 +101,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) .
Expand Down Expand Up @@ -601,6 +602,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket downsample --help"
Expand Down Expand Up @@ -675,6 +677,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket mark --help"
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -98,6 +98,8 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

require github.com/stretchr/testify v1.7.0

require (
cloud.google.com/go v0.99.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
Expand Down Expand Up @@ -186,7 +188,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.elastic.co/apm/module/apmhttp v1.11.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(cfg)
hashring := newMultiHashring(AlgorithmHashmod, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down
117 changes: 110 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -6,14 +6,31 @@ package receive
import (
"context"
"fmt"
"sort"
"strconv"
"sync"

"github.com/cespare/xxhash"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// HashringAlgorithm is the algorithm used to distribute series in the ring.
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmKetama HashringAlgorithm = "ketama"

// SectionsPerNode is the number of sections in the ring assigned to each node
// in the ketama hashring. A higher number yields a better series distribution,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe be clear about what "better" means. More "uniform"? more "localized"? More "random"? etc

// but also comes with a higher memory cost.
SectionsPerNode = 1000
)

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand Down Expand Up @@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// simpleHashring represents a group of nodes handling write requests.
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

// Get returns a target to handle the given tenant and time series.
Expand All @@ -69,6 +86,81 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

type section struct {
endpointIndex uint64
hash uint64
}

type sections []section

func (p sections) Len() int { return len(p) }
func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := ketamaHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
}

hash := xxhash.New()
for endpointIndex, endpoint := range endpoints {
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
n := &section{
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
}

ring.sections = append(ring.sections, *n)
hash.Reset()
}
}
sort.Sort(ring.sections)
return &ring
}

func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}

v := labelpb.HashWithPrefix(tenant, ts.Labels)

var i uint64
i = uint64(sort.Search(len(c.sections), func(i int) bool {
return c.sections[i].hash >= v
}))

numSections := uint64(len(c.sections))
if i == numSections {
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

return c.endpoints[nodeIndex], nil
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -121,13 +213,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring {
m := &multiHashring{
cache: make(map[string]Hashring),
}

newHashring := func(endpoints []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
}

for _, h := range cfg {
m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints))
m.hashrings = append(m.hashrings, newHashring(h.Endpoints))
var t map[string]struct{}
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
Expand All @@ -147,7 +250,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -157,15 +260,15 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(cfg)
updates <- newMultiHashring(algorithm, cfg)
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
Expand All @@ -176,5 +279,5 @@ func HashringFromConfig(content string) (Hashring, error) {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
return newMultiHashring(algorithm, config), err
}