Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent hashing #5408

Merged
merged 3 commits into from Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.

### Changed

Expand Down
7 changes: 5 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -426,7 +426,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -437,7 +437,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -718,6 +718,7 @@ type receiveConfig struct {

hashringsFilePath string
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
Expand Down Expand Up @@ -779,6 +780,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Expand Up @@ -126,6 +126,9 @@ Flags:
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-algorithm=hashmod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of sad our tooling does not put those values in help. Do you mean explicitly adding those options for people to know how to use it?

The algorithm used when distributing series in
the hashrings.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(cfg)
hashring := newMultiHashring(AlgorithmHashmod, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down
117 changes: 110 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -6,14 +6,31 @@ package receive
import (
"context"
"fmt"
"sort"
"strconv"
"sync"

"github.com/cespare/xxhash"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// HashringAlgorithm is the algorithm used to distribute series in the ring.
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmConsistent HashringAlgorithm = "consistent"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called "ketama" instead of "consistent"? So that other consistent hashing algorithms can be added in the future without breaking backwards compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should give it a more specific name. We can change it to either ring-hash, ketama, dynamo or something along those lines. I'm open to suggestions :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ketama sounds like a good one to me. It's one algorithm that uses a ring-hash. There could be other algorithms using ring-hashes and then we would have a name conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍 Renamed to ketama

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to agree with @douglascamata I think it would quite good to specify exactly what implementation we use. If it's based on libketama from https://dgryski.medium.com/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 - let's mentioned that in comment, name of this ketama, and rename structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// SectionsPerNode is the number of sections in the ring assigned to each node
// when using consistent hashing. A higher number yields a better series distribution,
// but also comes with a higher memory cost.
SectionsPerNode = 1000
)

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand Down Expand Up @@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// simpleHashring represents a group of nodes handling write requests.
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

// Get returns a target to handle the given tenant and time series.
Expand All @@ -69,6 +86,81 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

type section struct {
endpointIndex uint64
hash uint64
}

type sections []section

func (p sections) Len() int { return len(p) }
func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// consistentHashring represents a group of nodes handling write requests with consistent hashing.
type consistentHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := consistentHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
}

hash := xxhash.New()
for endpointIndex, endpoint := range endpoints {
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
n := &section{
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
}

ring.sections = append(ring.sections, *n)
hash.Reset()
}
}
sort.Sort(ring.sections)
return &ring
}

func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}

v := labelpb.HashWithPrefix(tenant, ts.Labels)

var i uint64
i = uint64(sort.Search(len(c.sections), func(i int) bool {
return c.sections[i].hash >= v
}))

numSections := uint64(len(c.sections))
if i == numSections {
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

return c.endpoints[nodeIndex], nil
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -121,13 +213,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring {
m := &multiHashring{
cache: make(map[string]Hashring),
}

newHashring := func(endpoints []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmConsistent:
return newConsistentHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
}

for _, h := range cfg {
m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints))
m.hashrings = append(m.hashrings, newHashring(h.Endpoints))
var t map[string]struct{}
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
Expand All @@ -147,7 +250,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -157,15 +260,15 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(cfg)
updates <- newMultiHashring(algorithm, cfg)
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
Expand All @@ -176,5 +279,5 @@ func HashringFromConfig(content string) (Hashring, error) {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
return newMultiHashring(algorithm, config), err
}