Skip to content

Commit

Permalink
Make the assignement consistent accross all clusters
Browse files Browse the repository at this point in the history
- The assignment or running of the algorithm has to be consistent across all the clusters. Changed the function to return a map where the consistent hash will be used to build the map

- Modifications to the createConsistentHashsingWithBoundLoads function. This will create the map for cluster to shard. Note that the list must be consistent across all shards so that is why the cluster list must be sorted before going through the consistent hash algorithm

Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com>
  • Loading branch information
akram committed Apr 29, 2024
1 parent 3681872 commit 8e61149
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 51 deletions.
47 changes: 16 additions & 31 deletions controller/sharding/consistent/consistent.go
Expand Up @@ -116,49 +116,34 @@ func (c *Consistent) GetLeast(client string) (string, error) {
if c.clients.Len() == 0 {
return "", ErrNoHosts
}

h := c.hash(client)
idx := c.search(h)

i := idx
for {
x := item{uint64(i)}
key := c.clients.Get(x)
var foundItem btree.Item
c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool {
if h != bItem.(item).value {
foundItem = bItem
return false // stop the iteration
}
return true
})

if foundItem == nil {
// If no host found, wrap around to the first one.
foundItem = c.clients.Min()
}
key := c.clients.Get(foundItem)
if key != nil {
host := c.servers[key.(*item).value]
host := c.servers[key.(item).value]
if c.loadOK(host) {
return host, nil
}
i++
if i >= c.clients.Len() {
i = 0
}
h = key.(item).value
} else {
return client, nil
}
}
}

func (c *Consistent) search(key uint64) int {
idx := 0
found := false

c.clients.Ascend(func(i btree.Item) bool {
if i.(item).value >= key {
found = true
return false // stop the iteration
}
idx++
return true
})

if !found {
idx = 0
}

return idx
}

// Sets the load of `server` to the given `load`
func (c *Consistent) UpdateLoad(server string, load int64) {
c.Lock()
Expand Down
59 changes: 46 additions & 13 deletions controller/sharding/sharding.go
Expand Up @@ -87,7 +87,7 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction(replicasCount)
case common.ConsistentHashingWithBoundedLoadsAlgorithm:
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, replicasCount)
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount)
default:
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist
// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm.
// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of
// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes.
func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction {
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
Expand All @@ -184,17 +184,12 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
consistentHashing := createConsistentHashingWithBoundLoads(replicas)
clusterIndex, err := consistentHashing.Get(c.ID)
if err != nil {
shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps)
shard, ok := shardIndexedByCluster[c.ID]
if !ok {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
shard, err := strconv.Atoi(clusterIndex)
if err != nil {
log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err)
return -1
}
log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
return shard
}
Expand All @@ -204,15 +199,53 @@ func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAcces
}
}

func createConsistentHashingWithBoundLoads(replicas int) *consistent.Consistent {
func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int {
clusters := getSortedClustersList(getCluster)
appDistribution := getAppDistribution(getCluster, getApp)
shardIndexedByCluster := make(map[string]int)
appsIndexedByShard := make(map[string]int64)
consistentHashing := consistent.New()
// Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard
// this happens for clusters that are removed for the clusters list
//consistentHashing.Add("-1")
for i := 0; i < replicas; i++ {
consistentHashing.Add(strconv.Itoa(i))
shard := strconv.Itoa(i)
consistentHashing.Add(shard)
appsIndexedByShard[shard] = 0
}

for _, c := range clusters {
clusterIndex, err := consistentHashing.GetLeast(c.ID)
if err != nil {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
}
shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex)
if err != nil {
log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err)
}
numApps, ok := appDistribution[c.Server]
if !ok {
numApps = 0
}
appsIndexedByShard[clusterIndex] += numApps
consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex])
}

return shardIndexedByCluster
}

func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 {
apps := getApps()
clusters := getCluster()
appDistribution := make(map[string]int64, len(clusters))

for _, a := range apps {
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
appDistribution[a.Spec.Destination.Server] = 0
}
appDistribution[a.Spec.Destination.Server]++
}
return consistentHashing
return appDistribution
}

// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
Expand Down
15 changes: 8 additions & 7 deletions controller/sharding/sharding_test.go
Expand Up @@ -287,12 +287,13 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) {
clusters = append(clusters, createCluster(cluster, id))
}
clusterAccessor := getClusterAccessor(clusters)
appAccessor, _, _, _, _, _ := createTestApps()
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)
// Test with replicas set to 3
replicasCount := 3
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, 0, distributionFunction(nil))
distributionMap := map[int]int{}
assignementMap := map[string]int{}
Expand Down Expand Up @@ -327,7 +328,7 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) {

// Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned
replicasCount = 2
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount)
removedCluster := clusterList.Items[len(clusterList.Items)-1]
for i := 0; i < clusterCount; i++ {
c := &clusters[i]
Expand All @@ -338,11 +339,10 @@ func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) {
t.Fail()
}
}

// Now, we remove the last added cluster, it should be unassigned
removedCluster = clusterList.Items[len(clusterList.Items)-1]
clusterList.Items = clusterList.Items[:len(clusterList.Items)-1]
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), replicasCount)
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount)
assert.Equal(t, -1, distributionFunction(&removedCluster))
}

Expand All @@ -352,11 +352,11 @@ func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) {
clusterAccessor := getClusterAccessor(clusters)
clusterList := &v1alpha1.ClusterList{Items: clusters}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)

appAccessor, _, _, _, _, _ := createTestApps()
// Test with replicas set to 0
replicasCount := 0
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, -1, distributionFunction(nil))
}

Expand All @@ -373,7 +373,8 @@ func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) {
// Test with replicas set to 5
replicasCount := 5
db.On("GetApplicationControllerReplicas").Return(replicasCount)
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, replicasCount)
appAccessor, _, _, _, _, _ := createTestApps()
distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount)
assert.Equal(t, fixedShard, int64(distributionFunction(cluster)))

}
Expand Down

0 comments on commit 8e61149

Please sign in to comment.