diff --git a/CHANGELOG.md b/CHANGELOG.md index 70e335f1f74..0033cd769f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. +- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 6ed11a28359..eed1981feac 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -426,7 +426,7 @@ func setupHashring(g *run.Group, ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, updates, cw) + return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw) }, func(error) { cancel() }) @@ -437,7 +437,7 @@ func setupHashring(g *run.Group, ) // The Hashrings config file content given initialize configuration from content. if len(conf.hashringsFileContent) > 0 { - ring, err = receive.HashringFromConfig(conf.hashringsFileContent) + ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent) if err != nil { close(updates) return errors.Wrap(err, "failed to validate hashring configuration file") @@ -718,6 +718,7 @@ type receiveConfig struct { hashringsFilePath string hashringsFileContent string + hashringsAlgorithm string refreshInterval *model.Duration endpoint string @@ -779,6 +780,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) + cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent)) + rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) diff --git a/docs/components/receive.md b/docs/components/receive.md index 16dfd8a42fe..792616ad21d 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -126,6 +126,9 @@ Flags: Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration. + --receive.hashrings-algorithm=hashmod + The algorithm used when distributing series in + the hashrings. --receive.hashrings-file= Path to file that contains the hashring configuration. A watcher is initialized to diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 6bc0bcf8b0c..422abfe7f77 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(cfg) + hashring := newMultiHashring(AlgorithmHashmod, cfg) for _, h := range handlers { h.Hashring(hashring) } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 520a610d0fe..b454c729946 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -6,14 +6,31 @@ package receive import ( "context" "fmt" + "sort" + "strconv" "sync" + "github.com/cespare/xxhash" + "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) +// HashringAlgorithm is the algorithm used to distribute series in the ring. +type HashringAlgorithm string + +const ( + AlgorithmHashmod HashringAlgorithm = "hashmod" + AlgorithmConsistent HashringAlgorithm = "consistent" + + // SectionsPerNode is the number of sections in the ring assigned to each node + // when using consistent hashing. A higher number yields a better series distribution, + // but also comes with a higher memory cost. + SectionsPerNode = 1000 +) + // insufficientNodesError is returned when a hashring does not // have enough nodes to satisfy a request for a node. type insufficientNodesError struct { @@ -52,7 +69,7 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri return string(s), nil } -// simpleHashring represents a group of nodes handling write requests. +// simpleHashring represents a group of nodes handling write requests by hashmoding individual series. type simpleHashring []string // Get returns a target to handle the given tenant and time series. @@ -69,6 +86,82 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil } +type section struct { + endpointIndex uint64 + hash uint64 +} + +type sections []section + +func (p sections) Len() int { return len(p) } +func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash } +func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p sections) Sort() { sort.Sort(p) } + +// consistentHashring represents a group of nodes handling write requests with consistent hashing. +type consistentHashring struct { + endpoints []string + sections sections + numEndpoints uint64 +} + +func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring { + // Replication works by choosing subsequent nodes in the ring. + // In order to improve consistency, we avoid relying on the ordering of the endpoints + // and sort them lexicographically. + sort.Strings(endpoints) + + numSections := len(endpoints) * sectionsPerNode + ring := consistentHashring{ + endpoints: endpoints, + sections: make(sections, 0, numSections), + numEndpoints: uint64(len(endpoints)), + } + + hash := xxhash.New() + for endpointIndex, endpoint := range endpoints { + for i := 1; i <= sectionsPerNode; i++ { + _, _ = hash.Write([]byte(endpoint + ":" + strconv.Itoa(i))) + n := §ion{ + endpointIndex: uint64(endpointIndex), + hash: hash.Sum64(), + } + + ring.sections = append(ring.sections, *n) + hash.Reset() + } + } + ring.sections.Sort() + + return &ring +} + +func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { + return c.GetN(tenant, ts, 0) +} + +func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { + if n >= c.numEndpoints { + return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1} + } + + v := labelpb.HashWithPrefix(tenant, ts.Labels) + + var i uint64 + i = uint64(sort.Search(len(c.sections), func(i int) bool { + return c.sections[i].hash >= v + })) + + numSections := uint64(len(c.sections)) + if i == numSections { + i = 0 + } + + nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints + + return c.endpoints[nodeIndex], nil +} + // multiHashring represents a set of hashrings. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. @@ -121,13 +214,24 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st // groups. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. -func newMultiHashring(cfg []HashringConfig) Hashring { +func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashring { m := &multiHashring{ cache: make(map[string]Hashring), } + newHashring := func(endpoints []string) Hashring { + switch algorithm { + case AlgorithmHashmod: + return simpleHashring(endpoints) + case AlgorithmConsistent: + return newConsistentHashring(endpoints, SectionsPerNode) + default: + return simpleHashring(endpoints) + } + } + for _, h := range cfg { - m.hashrings = append(m.hashrings, simpleHashring(h.Endpoints)) + m.hashrings = append(m.hashrings, newHashring(h.Endpoints)) var t map[string]struct{} if len(h.Tenants) != 0 { t = make(map[string]struct{}) @@ -147,7 +251,7 @@ func newMultiHashring(cfg []HashringConfig) Hashring { // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. // The updates chan is closed before exiting. -func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { +func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, updates chan<- Hashring, cw *ConfigWatcher) error { defer close(updates) go cw.Run(ctx) @@ -157,7 +261,7 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw if !ok { return errors.New("hashring config watcher stopped unexpectedly") } - updates <- newMultiHashring(cfg) + updates <- newMultiHashring(algorithm, cfg) case <-ctx.Done(): return ctx.Err() } @@ -165,7 +269,7 @@ func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw } // HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid. -func HashringFromConfig(content string) (Hashring, error) { +func HashringFromConfig(algorithm HashringAlgorithm, content string) (Hashring, error) { config, err := parseConfig([]byte(content)) if err != nil { return nil, errors.Wrapf(err, "failed to parse configuration") @@ -176,5 +280,5 @@ func HashringFromConfig(content string) (Hashring, error) { return nil, errors.Wrapf(err, "failed to load configuration") } - return newMultiHashring(config), err + return newMultiHashring(algorithm, config), err } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index f5a1411a5be..929025f7858 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -4,8 +4,11 @@ package receive import ( + "fmt" "testing" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -131,7 +134,7 @@ func TestHashringGet(t *testing.T) { }, }, } { - hs := newMultiHashring(tc.cfg) + hs := newMultiHashring(AlgorithmHashmod, tc.cfg) h, err := hs.Get(tc.tenant, ts) if tc.nodes != nil { if err != nil { @@ -148,3 +151,210 @@ func TestHashringGet(t *testing.T) { } } } + +func TestConsistentHashringGet(t *testing.T) { + baseTS := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: "pod", + Value: "nginx", + }, + }, + } + tests := []struct { + name string + nodes []string + expectedNode string + ts *prompb.TimeSeries + n uint64 + }{ + { + name: "base case", + nodes: []string{"node-1", "node-2", "node-3"}, + ts: baseTS, + expectedNode: "node-2", + }, + { + name: "base case with replication", + nodes: []string{"node-1", "node-2", "node-3"}, + ts: baseTS, + n: 1, + expectedNode: "node-3", + }, + { + name: "base case with replication", + nodes: []string{"node-1", "node-2", "node-3"}, + ts: baseTS, + n: 2, + expectedNode: "node-1", + }, + { + name: "base case with replication and reordered nodes", + nodes: []string{"node-1", "node-3", "node-2"}, + ts: baseTS, + n: 2, + expectedNode: "node-1", + }, + { + name: "base case with new node at beginning of ring", + nodes: []string{"node-0", "node-1", "node-2", "node-3"}, + ts: baseTS, + expectedNode: "node-2", + }, + { + name: "base case with new node at end of ring", + nodes: []string{"node-1", "node-2", "node-3", "node-4"}, + ts: baseTS, + expectedNode: "node-2", + }, + { + name: "base case with different timeseries", + nodes: []string{"node-1", "node-2", "node-3"}, + ts: &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: "pod", + Value: "thanos", + }, + }, + }, + expectedNode: "node-3", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + hashRing := newConsistentHashring(test.nodes, 10) + result, err := hashRing.GetN("tenant", test.ts, test.n) + if err != nil { + t.Error(err) + } + + if result != test.expectedNode { + t.Fatalf("invalid result: got %s, want %s", result, test.expectedNode) + } + }) + } +} + +func TestConsistentHashringConsistency(t *testing.T) { + series := makeSeries() + + ringA := []string{"node-1", "node-2", "node-3"} + a1, err := assignSeries(series, ringA) + if err != nil { + t.Fatal(err) + } + + ringB := []string{"node-1", "node-2", "node-3"} + a2, err := assignSeries(series, ringB) + if err != nil { + t.Fatal(err) + } + + for node, ts := range a1 { + if len(a2[node]) != len(ts) { + t.Fatalf("node %s has an inconsistent number of series", node) + } + } + + for node, ts := range a2 { + if len(a1[node]) != len(ts) { + t.Fatalf("node %s has an inconsistent number of series", node) + } + } +} + +func TestConsistentHashringIncreaseAtEnd(t *testing.T) { + series := makeSeries() + + initialRing := []string{"node-1", "node-2", "node-3"} + initialAssignments, err := assignSeries(series, initialRing) + if err != nil { + t.Fatal(err) + } + + resizedRing := []string{"node-1", "node-2", "node-3", "node-4", "node-5"} + reassignments, err := assignSeries(series, resizedRing) + if err != nil { + t.Fatal(err) + } + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range initialRing { + for _, ts := range reassignments[node] { + foundInInitialAssignment := findSeries(initialAssignments, node, ts) + if !foundInInitialAssignment { + t.Fatalf("node %s contains new series after resizing", node) + } + } + } +} + +func TestConsistentHashringIncreaseInMiddle(t *testing.T) { + series := makeSeries() + + initialRing := []string{"node-1", "node-3"} + initialAssignments, err := assignSeries(series, initialRing) + if err != nil { + t.Fatal(err) + } + + resizedRing := []string{"node-1", "node-2", "node-3"} + reassignments, err := assignSeries(series, resizedRing) + if err != nil { + t.Fatal(err) + } + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range initialRing { + for _, ts := range reassignments[node] { + foundInInitialAssignment := findSeries(initialAssignments, node, ts) + if !foundInInitialAssignment { + t.Fatalf("node %s contains new series after resizing", node) + } + } + } +} + +func makeSeries() []*prompb.TimeSeries { + series := make([]*prompb.TimeSeries, 30) + for i := 0; i < 30; i++ { + series[i] = &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: "pod", + Value: fmt.Sprintf("nginx-%d", i), + }, + }, + } + } + return series +} + +func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, newSeries *prompb.TimeSeries) bool { + for _, oldSeries := range initialAssignments[node] { + l1 := labelpb.ZLabelsToPromLabels(newSeries.Labels) + l2 := labelpb.ZLabelsToPromLabels(oldSeries.Labels) + if labels.Equal(l1, l2) { + return true + } + } + + return false +} + +func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { + hashRing := newConsistentHashring(nodes, 1000) + assignments := make(map[string][]*prompb.TimeSeries, 0) + for _, ts := range series { + result, err := hashRing.GetN("tenant", ts, 0) + if err != nil { + return nil, err + } + assignments[result] = append(assignments[result], ts) + + } + + return assignments, nil +}