Skip to content

Commit

Permalink
receive: Added Ketamo Consistent hashing (#5408)
Browse files Browse the repository at this point in the history
* Add support for consistent hashing in receivers

This commit adds support for distributing series in Receivers using
consistent hashing based on the libketama algorithm.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Use require package for test assertions

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Rename algorithm from consistent to ketama

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 16, 2022
1 parent d095a00 commit 0d15bc0
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5337](https://github.com/thanos-io/thanos/pull/5337) Thanos Object Store: Add the `prefix` option to buckets
- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.

### Changed

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand Down Expand Up @@ -426,7 +427,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -437,7 +438,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -718,6 +719,7 @@ type receiveConfig struct {

hashringsFilePath string
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
Expand Down Expand Up @@ -779,6 +781,11 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext).
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Expand Up @@ -127,6 +127,9 @@ Flags:
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings. Must be one of hashmod, ketama
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -98,6 +98,8 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

require github.com/stretchr/testify v1.7.0

require (
cloud.google.com/go v0.99.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
Expand Down Expand Up @@ -186,7 +188,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.elastic.co/apm/module/apmhttp v1.11.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(cfg)
hashring := newMultiHashring(AlgorithmHashmod, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down
117 changes: 110 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -6,14 +6,31 @@ package receive
import (
"context"
"fmt"
"sort"
"strconv"
"sync"

"github.com/cespare/xxhash"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// HashringAlgorithm is the algorithm used to distribute series in the ring.
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmKetama HashringAlgorithm = "ketama"

// SectionsPerNode is the number of sections in the ring assigned to each node
// in the ketama hashring. A higher number yields a better series distribution,
// but also comes with a higher memory cost.
SectionsPerNode = 1000
)

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand Down Expand Up @@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// simpleHashring represents a group of nodes handling write requests.
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

// Get returns a target to handle the given tenant and time series.
Expand All @@ -69,6 +86,81 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

type section struct {
endpointIndex uint64
hash uint64
}

type sections []section

func (p sections) Len() int { return len(p) }
func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := ketamaHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
}

hash := xxhash.New()
for endpointIndex, endpoint := range endpoints {
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
n := &section{
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
}

ring.sections = append(ring.sections, *n)
hash.Reset()
}
}
sort.Sort(ring.sections)
return &ring
}

func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}

v := labelpb.HashWithPrefix(tenant, ts.Labels)

var i uint64
i = uint64(sort.Search(len(c.sections), func(i int) bool {
return c.sections[i].hash >= v
}))

numSections := uint64(len(c.sections))
if i == numSections {
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

return c.endpoints[nodeIndex], nil
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -121,13 +213,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring {
m := &multiHashring{
cache: make(map[string]Hashring),
}

newHashring := func(endpoints []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
}

for _, h := range cfg {
m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints))
m.hashrings = append(m.hashrings, newHashring(h.Endpoints))
var t map[string]struct{}
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
Expand All @@ -147,7 +250,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -157,15 +260,15 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(cfg)
updates <- newMultiHashring(algorithm, cfg)
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
Expand All @@ -176,5 +279,5 @@ func HashringFromConfig(content string) (Hashring, error) {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
return newMultiHashring(algorithm, config), err
}

0 comments on commit 0d15bc0

Please sign in to comment.