Skip to content

Commit

Permalink
Improve ketama hashring replication
Browse files Browse the repository at this point in the history
With the Ketama hashring, replication is currently handled by choosing
subsequent nodes in the list of endpoints. This can lead to existing nodes
getting more series when the hashring is scaled.

This commit changes replication to choose subsequent nodes from the hashring
which should not create new series in old nodes when the hashring is scaled.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 21, 2022
1 parent 31ce79b commit f73c017
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode
- [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes
- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency

### Added

Expand Down
9 changes: 2 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -106,11 +106,6 @@ type ketamaHashring struct {
}

func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := ketamaHashring{
endpoints: endpoints,
Expand Down Expand Up @@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

i = (i + n) % numSections
nodeIndex := c.sections[i].endpointIndex
return c.endpoints[nodeIndex], nil
}

Expand Down
52 changes: 39 additions & 13 deletions pkg/receive/hashring_test.go
Expand Up @@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) {
}
}

func TestConsistentHashringGet(t *testing.T) {
func TestKetamaHashringGet(t *testing.T) {
baseTS := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
{
Expand All @@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) {
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 1,
expectedNode: "node-3",
expectedNode: "node-1",
},
{
name: "base case with replication",
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with replication and reordered nodes",
nodes: []string{"node-1", "node-3", "node-2"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with new node at beginning of ring",
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConsistentHashringGet(t *testing.T) {
}
}

func TestConsistentHashringConsistency(t *testing.T) {
func TestKetamaHashringConsistency(t *testing.T) {
series := makeSeries(10000)

ringA := []string{"node-1", "node-2", "node-3"}
Expand All @@ -254,7 +254,7 @@ func TestConsistentHashringConsistency(t *testing.T) {
}
}

func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
func TestKetamaHashringIncreaseAtEnd(t *testing.T) {
series := makeSeries(10000)

initialRing := []string{"node-1", "node-2", "node-3"}
Expand All @@ -274,7 +274,7 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
}
}

func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
func TestKetamaHashringIncreaseInMiddle(t *testing.T) {
series := makeSeries(10000)

initialRing := []string{"node-1", "node-3"}
Expand All @@ -294,6 +294,26 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
}
}

func TestKetamaHashringReplicationConsistency(t *testing.T) {
series := makeSeries(10000)

initialRing := []string{"node-1", "node-4", "node-5"}
initialAssignments, err := assignReplicatedSeries(series, initialRing, 2)
require.NoError(t, err)

resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"}
reassignments, err := assignReplicatedSeries(series, resizedRing, 2)
require.NoError(t, err)

// Assert that the initial nodes have no new keys after increasing the ring size
for _, node := range initialRing {
for _, ts := range reassignments[node] {
foundInInitialAssignment := findSeries(initialAssignments, node, ts)
require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node)
}
}
}

func makeSeries(numSeries int) []*prompb.TimeSeries {
series := make([]*prompb.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
Expand Down Expand Up @@ -322,15 +342,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string,
}

func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) {
return assignReplicatedSeries(series, nodes, 0)
}

func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) {
hashRing := newKetamaHashring(nodes, SectionsPerNode)
assignments := make(map[string][]*prompb.TimeSeries)
for _, ts := range series {
result, err := hashRing.Get("tenant", ts)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)
for i := uint64(0); i < replicas; i++ {
for _, ts := range series {
result, err := hashRing.GetN("tenant", ts, i)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)

}
}

return assignments, nil
Expand Down

0 comments on commit f73c017

Please sign in to comment.