Skip to content

Commit

Permalink
Rename algorithm from consistent to ketama
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 16, 2022
1 parent 0ccb5b0 commit 152ce39
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/receive.go
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand Down Expand Up @@ -780,7 +781,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent))
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext).
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Expand Up @@ -128,7 +128,7 @@ Flags:
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings.
the hashrings. Must be one of hashmod, ketama
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Expand Up @@ -101,6 +101,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) .
Expand Down Expand Up @@ -601,6 +602,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket downsample --help"
Expand Down Expand Up @@ -675,6 +677,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket mark --help"
Expand Down
22 changes: 11 additions & 11 deletions pkg/receive/hashring.go
Expand Up @@ -22,11 +22,11 @@ import (
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmConsistent HashringAlgorithm = "consistent"
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmKetama HashringAlgorithm = "ketama"

// SectionsPerNode is the number of sections in the ring assigned to each node
// when using consistent hashing. A higher number yields a better series distribution,
// in the ketama hashring. A higher number yields a better series distribution,
// but also comes with a higher memory cost.
SectionsPerNode = 1000
)
Expand Down Expand Up @@ -98,21 +98,21 @@ func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// consistentHashring represents a group of nodes handling write requests with consistent hashing.
type consistentHashring struct {
// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring {
func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := consistentHashring{
ring := ketamaHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
Expand All @@ -135,11 +135,11 @@ func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentH
return &ring
}

func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}
Expand Down Expand Up @@ -222,8 +222,8 @@ func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashrin
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmConsistent:
return newConsistentHashring(endpoints, SectionsPerNode)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/hashring_test.go
Expand Up @@ -226,7 +226,7 @@ func TestConsistentHashringGet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hashRing := newConsistentHashring(test.nodes, 10)
hashRing := newKetamaHashring(test.nodes, 10)
result, err := hashRing.GetN("tenant", test.ts, test.n)
require.NoError(t, err)
require.Equal(t, test.expectedNode, result)
Expand Down Expand Up @@ -322,7 +322,7 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string,
}

func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) {
hashRing := newConsistentHashring(nodes, SectionsPerNode)
hashRing := newKetamaHashring(nodes, SectionsPerNode)
assignments := make(map[string][]*prompb.TimeSeries)
for _, ts := range series {
result, err := hashRing.Get("tenant", ts)
Expand Down

0 comments on commit 152ce39

Please sign in to comment.