Skip to content

Commit

Permalink
Add support for consistent hashing in receivers
Browse files Browse the repository at this point in the history
This commit adds support for distributing series in Receivers using
consistent hashing based on the libketama algorithm.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 8, 2022
1 parent 586449e commit ec6b963
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.

### Changed

Expand Down
7 changes: 5 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -426,7 +426,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -437,7 +437,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -718,6 +718,7 @@ type receiveConfig struct {

hashringsFilePath string
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
Expand Down Expand Up @@ -779,6 +780,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Expand Up @@ -126,6 +126,9 @@ Flags:
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(cfg)
hashring := newMultiHashring(AlgorithmHashmod, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down
118 changes: 111 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -6,14 +6,31 @@ package receive
import (
"context"
"fmt"
"sort"
"strconv"
"sync"

"github.com/cespare/xxhash"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// HashringAlgorithm is the algorithm used to distribute series in the ring.
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmConsistent HashringAlgorithm = "consistent"

// SectionsPerNode is the number of sections in the ring assigned to each node
// when using consistent hashing. A higher number yields a better series distribution,
// but also comes with a higher memory cost.
SectionsPerNode = 1000
)

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand Down Expand Up @@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// simpleHashring represents a group of nodes handling write requests.
// simpleHashring represents a group of nodes handling write requests by hashmoding individual series.
type simpleHashring []string

// Get returns a target to handle the given tenant and time series.
Expand All @@ -69,6 +86,82 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

type section struct {
endpointIndex uint64
hash uint64
}

type sections []section

func (p sections) Len() int { return len(p) }
func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// consistentHashring represents a group of nodes handling write requests with consistent hashing.
type consistentHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := consistentHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
}

hash := xxhash.New()
for endpointIndex, endpoint := range endpoints {
for i := 1; i <= sectionsPerNode; i++ {
_, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i)))
n := &section{
endpointIndex: uint64(endpointIndex),
hash: hash.Sum64(),
}

ring.sections = append(ring.sections, *n)
hash.Reset()
}
}
ring.sections.Sort()

return &ring
}

func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}

v := labelpb.HashWithPrefix(tenant, ts.Labels)

var i uint64
i = uint64(sort.Search(len(c.sections), func(i int) bool {
return c.sections[i].hash >= v
}))

numSections := uint64(len(c.sections))
if i == numSections {
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

return c.endpoints[nodeIndex], nil
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -121,13 +214,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring {
m := &multiHashring{
cache: make(map[string]Hashring),
}

newHashring := func(endpoints []string) Hashring {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmConsistent:
return newConsistentHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
}

for _, h := range cfg {
m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints))
m.hashrings = append(m.hashrings, newHashring(h.Endpoints))
var t map[string]struct{}
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
Expand All @@ -147,7 +251,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -157,15 +261,15 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(cfg)
updates <- newMultiHashring(algorithm, cfg)
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
Expand All @@ -176,5 +280,5 @@ func HashringFromConfig(content string) (Hashring, error) {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
return newMultiHashring(algorithm, config), err
}

0 comments on commit ec6b963

Please sign in to comment.