From 1587739d0300cdc1202dcc9f37dd603988980810 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 21 Jun 2022 09:48:46 +0200 Subject: [PATCH] Improve ketama hashring replication With the Ketama hashring, replication is currently handled by choosing subsequent nodes in the list of endpoints. This can lead to existing nodes getting more series when the hashring is scaled. This commit changes replication to choose subsequent nodes from the hashring which should not create new series in old nodes when the hashring is scaled. Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 + pkg/receive/hashring.go | 9 ++---- pkg/receive/hashring_test.go | 61 ++++++++++++++++++++++++++---------- 3 files changed, 47 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a52d488a5..69bd2fe509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode - [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes +- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency ### Added diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index cb4c3e78e2..3d6cf36555 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -106,11 +106,6 @@ type ketamaHashring struct { } func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring { - // Replication works by choosing subsequent nodes in the ring. - // In order to improve consistency, we avoid relying on the ordering of the endpoints - // and sort them lexicographically. - sort.Strings(endpoints) - numSections := len(endpoints) * sectionsPerNode ring := ketamaHashring{ endpoints: endpoints, @@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st i = 0 } - nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints - + i = (i + n) % numSections + nodeIndex := c.sections[i].endpointIndex return c.endpoints[nodeIndex], nil } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 676aaf8c07..d2b4171d96 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) { } } -func TestConsistentHashringGet(t *testing.T) { +func TestKetamaHashringGet(t *testing.T) { baseTS := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) { nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 1, - expectedNode: "node-3", + expectedNode: "node-1", }, { name: "base case with replication", nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with replication and reordered nodes", nodes: []string{"node-1", "node-3", "node-2"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with new node at beginning of ring", @@ -234,8 +234,8 @@ func TestConsistentHashringGet(t *testing.T) { } } -func TestConsistentHashringConsistency(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringConsistency(t *testing.T) { + series := makeSeries() ringA := []string{"node-1", "node-2", "node-3"} a1, err := assignSeries(series, ringA) @@ -254,8 +254,8 @@ func TestConsistentHashringConsistency(t *testing.T) { } } -func TestConsistentHashringIncreaseAtEnd(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringIncreaseAtEnd(t *testing.T) { + series := makeSeries() initialRing := []string{"node-1", "node-2", "node-3"} initialAssignments, err := assignSeries(series, initialRing) @@ -274,8 +274,8 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) { } } -func TestConsistentHashringIncreaseInMiddle(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringIncreaseInMiddle(t *testing.T) { + series := makeSeries() initialRing := []string{"node-1", "node-3"} initialAssignments, err := assignSeries(series, initialRing) @@ -294,7 +294,28 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) { } } -func makeSeries(numSeries int) []*prompb.TimeSeries { +func TestKetamaHashringReplicationConsistency(t *testing.T) { + series := makeSeries() + + initialRing := []string{"node-1", "node-4", "node-5"} + initialAssignments, err := assignReplicatedSeries(series, initialRing, 2) + require.NoError(t, err) + + resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"} + reassignments, err := assignReplicatedSeries(series, resizedRing, 2) + require.NoError(t, err) + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range initialRing { + for _, ts := range reassignments[node] { + foundInInitialAssignment := findSeries(initialAssignments, node, ts) + require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) + } + } +} + +func makeSeries() []*prompb.TimeSeries { + numSeries := 10000 series := make([]*prompb.TimeSeries, numSeries) for i := 0; i < numSeries; i++ { series[i] = &prompb.TimeSeries{ @@ -322,15 +343,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, } func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { + return assignReplicatedSeries(series, nodes, 0) +} + +func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) { hashRing := newKetamaHashring(nodes, SectionsPerNode) assignments := make(map[string][]*prompb.TimeSeries) - for _, ts := range series { - result, err := hashRing.Get("tenant", ts) - if err != nil { - return nil, err - } - assignments[result] = append(assignments[result], ts) + for i := uint64(0); i < replicas; i++ { + for _, ts := range series { + result, err := hashRing.GetN("tenant", ts, i) + if err != nil { + return nil, err + } + assignments[result] = append(assignments[result], ts) + } } return assignments, nil