diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index eed1981fea..e6228c26fd 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path" + "strings" "time" extflag "github.com/efficientgo/tools/extkingpin" @@ -780,7 +781,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) - cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent)) + hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") + cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext). + Default(string(receive.AlgorithmHashmod)). + EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)) rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) diff --git a/docs/components/receive.md b/docs/components/receive.md index 792616ad21..58dce7eb39 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -128,7 +128,7 @@ Flags: the hashring configuration. --receive.hashrings-algorithm=hashmod The algorithm used when distributing series in - the hashrings. + the hashrings. Must be one of hashmod, ketama --receive.hashrings-file= Path to file that contains the hashring configuration. A watcher is initialized to diff --git a/docs/components/tools.md b/docs/components/tools.md index 80ceed9585..776164aeb0 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -101,6 +101,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) . @@ -601,6 +602,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket downsample --help" @@ -675,6 +677,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket mark --help" diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index f7eb719418..cb4c3e78e2 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -22,11 +22,11 @@ import ( type HashringAlgorithm string const ( - AlgorithmHashmod HashringAlgorithm = "hashmod" - AlgorithmConsistent HashringAlgorithm = "consistent" + AlgorithmHashmod HashringAlgorithm = "hashmod" + AlgorithmKetama HashringAlgorithm = "ketama" // SectionsPerNode is the number of sections in the ring assigned to each node - // when using consistent hashing. A higher number yields a better series distribution, + // in the ketama hashring. A higher number yields a better series distribution, // but also comes with a higher memory cost. SectionsPerNode = 1000 ) @@ -98,21 +98,21 @@ func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash } func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p sections) Sort() { sort.Sort(p) } -// consistentHashring represents a group of nodes handling write requests with consistent hashing. -type consistentHashring struct { +// ketamaHashring represents a group of nodes handling write requests with consistent hashing. +type ketamaHashring struct { endpoints []string sections sections numEndpoints uint64 } -func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring { +func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring { // Replication works by choosing subsequent nodes in the ring. // In order to improve consistency, we avoid relying on the ordering of the endpoints // and sort them lexicographically. sort.Strings(endpoints) numSections := len(endpoints) * sectionsPerNode - ring := consistentHashring{ + ring := ketamaHashring{ endpoints: endpoints, sections: make(sections, 0, numSections), numEndpoints: uint64(len(endpoints)), @@ -135,11 +135,11 @@ func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentH return &ring } -func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { +func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { return c.GetN(tenant, ts, 0) } -func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { +func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { if n >= c.numEndpoints { return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1} } @@ -222,8 +222,8 @@ func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashrin switch algorithm { case AlgorithmHashmod: return simpleHashring(endpoints) - case AlgorithmConsistent: - return newConsistentHashring(endpoints, SectionsPerNode) + case AlgorithmKetama: + return newKetamaHashring(endpoints, SectionsPerNode) default: return simpleHashring(endpoints) } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index f15ca5ce59..676aaf8c07 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -226,7 +226,7 @@ func TestConsistentHashringGet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hashRing := newConsistentHashring(test.nodes, 10) + hashRing := newKetamaHashring(test.nodes, 10) result, err := hashRing.GetN("tenant", test.ts, test.n) require.NoError(t, err) require.Equal(t, test.expectedNode, result) @@ -322,7 +322,7 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, } func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { - hashRing := newConsistentHashring(nodes, SectionsPerNode) + hashRing := newKetamaHashring(nodes, SectionsPerNode) assignments := make(map[string][]*prompb.TimeSeries) for _, ts := range series { result, err := hashRing.Get("tenant", ts)