/
hashring.go
103 lines (86 loc) · 2.8 KB
/
hashring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package balancer
import (
"math/rand"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"github.com/authzed/spicedb/pkg/consistent"
)
type ctxKey string
const (
// BalancerName is the name of consistent-hashring balancer.
BalancerName = "consistent-hashring"
// CtxKey is the key for the grpc request's context.Context which points to
// the key to hash for the request. The value it points to must be []byte
CtxKey ctxKey = "requestKey"
)
var logger = grpclog.Component("consistenthashring")
// NewConsistentHashringBuilder creates a new balancer.Builder that
// will create a consistent hashring balancer with the given config.
// Before making a connection, register it with grpc with:
// `balancer.Register(consistent.NewConsistentHashringBuilder(hasher, factor, spread))`
func NewConsistentHashringBuilder(hasher consistent.HasherFunc, replicationFactor uint16, spread uint8) balancer.Builder {
return base.NewBalancerBuilder(
BalancerName,
&consistentHashringPickerBuilder{hasher: hasher, replicationFactor: replicationFactor, spread: spread},
base.Config{HealthCheck: true},
)
}
type subConnMember struct {
balancer.SubConn
key string
}
// Key implements consistent.Member
// This value is what will be hashed for placement on the consistent hash ring.
func (s subConnMember) Key() string {
return s.key
}
var _ consistent.Member = &subConnMember{}
type consistentHashringPickerBuilder struct {
hasher consistent.HasherFunc
replicationFactor uint16
spread uint8
}
func (b *consistentHashringPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("consistentHashringPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
hashring := consistent.NewHashring(b.hasher, b.replicationFactor)
for sc, scInfo := range info.ReadySCs {
if err := hashring.Add(subConnMember{
SubConn: sc,
key: scInfo.Address.Addr + scInfo.Address.ServerName,
}); err != nil {
return base.NewErrPicker(err)
}
}
return &consistentHashringPicker{
hashring: hashring,
spread: b.spread,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
type consistentHashringPicker struct {
sync.Mutex
hashring *consistent.Hashring
spread uint8
rand *rand.Rand
}
func (p *consistentHashringPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
key := info.Ctx.Value(CtxKey).([]byte)
members, err := p.hashring.FindN(key, p.spread)
if err != nil {
return balancer.PickResult{}, err
}
// rand is not safe for concurrent use
p.Lock()
index := p.rand.Intn(int(p.spread))
p.Unlock()
chosen := members[index].(subConnMember)
return balancer.PickResult{
SubConn: chosen.SubConn,
}, nil
}