Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ketama hashring replication #5427

Merged
merged 1 commit into from Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode
- [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes
- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency

### Added

Expand Down
9 changes: 2 additions & 7 deletions pkg/receive/hashring.go
Expand Up @@ -106,11 +106,6 @@ type ketamaHashring struct {
}

func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := ketamaHashring{
endpoints: endpoints,
Expand Down Expand Up @@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

i = (i + n) % numSections
nodeIndex := c.sections[i].endpointIndex
return c.endpoints[nodeIndex], nil
}

Expand Down
61 changes: 44 additions & 17 deletions pkg/receive/hashring_test.go
Expand Up @@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) {
}
}

func TestConsistentHashringGet(t *testing.T) {
func TestKetamaHashringGet(t *testing.T) {
baseTS := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
{
Expand All @@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) {
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 1,
expectedNode: "node-3",
expectedNode: "node-1",
},
{
name: "base case with replication",
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with replication and reordered nodes",
nodes: []string{"node-1", "node-3", "node-2"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with new node at beginning of ring",
Expand Down Expand Up @@ -234,8 +234,8 @@ func TestConsistentHashringGet(t *testing.T) {
}
}

func TestConsistentHashringConsistency(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringConsistency(t *testing.T) {
series := makeSeries()

ringA := []string{"node-1", "node-2", "node-3"}
a1, err := assignSeries(series, ringA)
Expand All @@ -254,8 +254,8 @@ func TestConsistentHashringConsistency(t *testing.T) {
}
}

func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringIncreaseAtEnd(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-2", "node-3"}
initialAssignments, err := assignSeries(series, initialRing)
Expand All @@ -274,8 +274,8 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
}
}

func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringIncreaseInMiddle(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-3"}
initialAssignments, err := assignSeries(series, initialRing)
Expand All @@ -294,7 +294,28 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
}
}

func makeSeries(numSeries int) []*prompb.TimeSeries {
func TestKetamaHashringReplicationConsistency(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-4", "node-5"}
initialAssignments, err := assignReplicatedSeries(series, initialRing, 2)
require.NoError(t, err)

resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"}
reassignments, err := assignReplicatedSeries(series, resizedRing, 2)
require.NoError(t, err)

// Assert that the initial nodes have no new keys after increasing the ring size
for _, node := range initialRing {
for _, ts := range reassignments[node] {
foundInInitialAssignment := findSeries(initialAssignments, node, ts)
require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node)
}
}
}

func makeSeries() []*prompb.TimeSeries {
numSeries := 10000
series := make([]*prompb.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
series[i] = &prompb.TimeSeries{
Expand Down Expand Up @@ -322,15 +343,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string,
}

func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) {
return assignReplicatedSeries(series, nodes, 0)
}

func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) {
hashRing := newKetamaHashring(nodes, SectionsPerNode)
assignments := make(map[string][]*prompb.TimeSeries)
for _, ts := range series {
result, err := hashRing.Get("tenant", ts)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)
for i := uint64(0); i < replicas; i++ {
for _, ts := range series {
result, err := hashRing.GetN("tenant", ts, i)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)

}
}

return assignments, nil
Expand Down