From f73c01759907fd0f766e7e2a7ee043a71173b585 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 21 Jun 2022 09:48:46 +0200 Subject: [PATCH] Improve ketama hashring replication With the Ketama hashring, replication is currently handled by choosing subsequent nodes in the list of endpoints. This can lead to existing nodes getting more series when the hashring is scaled. This commit changes replication to choose subsequent nodes from the hashring which should not create new series in old nodes when the hashring is scaled. Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 + pkg/receive/hashring.go | 9 ++----- pkg/receive/hashring_test.go | 52 +++++++++++++++++++++++++++--------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a52d488a51..69bd2fe5094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode - [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes +- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency ### Added diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index cb4c3e78e2a..3d6cf36555c 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -106,11 +106,6 @@ type ketamaHashring struct { } func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring { - // Replication works by choosing subsequent nodes in the ring. - // In order to improve consistency, we avoid relying on the ordering of the endpoints - // and sort them lexicographically. - sort.Strings(endpoints) - numSections := len(endpoints) * sectionsPerNode ring := ketamaHashring{ endpoints: endpoints, @@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st i = 0 } - nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints - + i = (i + n) % numSections + nodeIndex := c.sections[i].endpointIndex return c.endpoints[nodeIndex], nil } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 676aaf8c078..e63120fb126 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) { } } -func TestConsistentHashringGet(t *testing.T) { +func TestKetamaHashringGet(t *testing.T) { baseTS := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) { nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 1, - expectedNode: "node-3", + expectedNode: "node-1", }, { name: "base case with replication", nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with replication and reordered nodes", nodes: []string{"node-1", "node-3", "node-2"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with new node at beginning of ring", @@ -234,7 +234,7 @@ func TestConsistentHashringGet(t *testing.T) { } } -func TestConsistentHashringConsistency(t *testing.T) { +func TestKetamaHashringConsistency(t *testing.T) { series := makeSeries(10000) ringA := []string{"node-1", "node-2", "node-3"} @@ -254,7 +254,7 @@ func TestConsistentHashringConsistency(t *testing.T) { } } -func TestConsistentHashringIncreaseAtEnd(t *testing.T) { +func TestKetamaHashringIncreaseAtEnd(t *testing.T) { series := makeSeries(10000) initialRing := []string{"node-1", "node-2", "node-3"} @@ -274,7 +274,7 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) { } } -func TestConsistentHashringIncreaseInMiddle(t *testing.T) { +func TestKetamaHashringIncreaseInMiddle(t *testing.T) { series := makeSeries(10000) initialRing := []string{"node-1", "node-3"} @@ -294,6 +294,26 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) { } } +func TestKetamaHashringReplicationConsistency(t *testing.T) { + series := makeSeries(10000) + + initialRing := []string{"node-1", "node-4", "node-5"} + initialAssignments, err := assignReplicatedSeries(series, initialRing, 2) + require.NoError(t, err) + + resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"} + reassignments, err := assignReplicatedSeries(series, resizedRing, 2) + require.NoError(t, err) + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range initialRing { + for _, ts := range reassignments[node] { + foundInInitialAssignment := findSeries(initialAssignments, node, ts) + require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) + } + } +} + func makeSeries(numSeries int) []*prompb.TimeSeries { series := make([]*prompb.TimeSeries, numSeries) for i := 0; i < numSeries; i++ { @@ -322,15 +342,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, } func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { + return assignReplicatedSeries(series, nodes, 0) +} + +func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) { hashRing := newKetamaHashring(nodes, SectionsPerNode) assignments := make(map[string][]*prompb.TimeSeries) - for _, ts := range series { - result, err := hashRing.Get("tenant", ts) - if err != nil { - return nil, err - } - assignments[result] = append(assignments[result], ts) + for i := uint64(0); i < replicas; i++ { + for _, ts := range series { + result, err := hashRing.GetN("tenant", ts, i) + if err != nil { + return nil, err + } + assignments[result] = append(assignments[result], ts) + } } return assignments, nil