Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatcher: Give a couple of minutes for nodes to recover on re-election #1238

Merged
merged 1 commit into from
Jul 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 7 additions & 4 deletions manager/dispatcher/dispatcher.go
Expand Up @@ -31,6 +31,7 @@ const (
defaultHeartBeatEpsilon = 500 * time.Millisecond
defaultGracePeriodMultiplier = 3
defaultRateLimitPeriod = 8 * time.Second
defaultStartupGracePeriod = 2 * time.Minute

// maxBatchItems is the threshold of queued writes that should
// trigger an actual transaction to commit them to the shared store.
Expand Down Expand Up @@ -61,9 +62,10 @@ var (
// DefautConfig.
type Config struct {
// Addr configures the address the dispatcher reports to agents.
Addr string
HeartbeatPeriod time.Duration
HeartbeatEpsilon time.Duration
Addr string
HeartbeatPeriod time.Duration
HeartbeatEpsilon time.Duration
StartupGracePeriod time.Duration
// RateLimitPeriod specifies how often node with same ID can try to register
// new session.
RateLimitPeriod time.Duration
Expand All @@ -75,6 +77,7 @@ func DefaultConfig() *Config {
return &Config{
HeartbeatPeriod: DefaultHeartBeatPeriod,
HeartbeatEpsilon: defaultHeartBeatEpsilon,
StartupGracePeriod: defaultStartupGracePeriod,
RateLimitPeriod: defaultRateLimitPeriod,
GracePeriodMultiplier: defaultGracePeriodMultiplier,
}
Expand Down Expand Up @@ -122,7 +125,7 @@ func (b weightedPeerByNodeID) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func New(cluster Cluster, c *Config) *Dispatcher {
return &Dispatcher{
addr: c.Addr,
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.StartupGracePeriod, c.RateLimitPeriod),
store: cluster.MemoryStore(),
cluster: cluster,
mgrQueue: watch.NewQueue(16),
Expand Down
2 changes: 2 additions & 0 deletions manager/dispatcher/dispatcher_test.go
Expand Up @@ -273,6 +273,7 @@ func TestHeartbeatTimeout(t *testing.T) {
cfg := DefaultConfig()
cfg.HeartbeatPeriod = 100 * time.Millisecond
cfg.HeartbeatEpsilon = 0
cfg.StartupGracePeriod = 100 * time.Millisecond
gd, err := startDispatcher(cfg)
assert.NoError(t, err)
defer gd.Close()
Expand Down Expand Up @@ -737,6 +738,7 @@ func TestNodesCount(t *testing.T) {
cfg := DefaultConfig()
cfg.HeartbeatPeriod = 100 * time.Millisecond
cfg.HeartbeatEpsilon = 0
cfg.StartupGracePeriod = 100 * time.Millisecond
gd, err := startDispatcher(cfg)
assert.NoError(t, err)
defer gd.Close()
Expand Down
11 changes: 8 additions & 3 deletions manager/dispatcher/nodes.go
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/docker/swarmkit/manager/dispatcher/heartbeat"
)

const rateLimitCount = 3
const (
rateLimitCount = 3
)

type registeredNode struct {
SessionID string
Expand Down Expand Up @@ -45,16 +47,18 @@ func (rn *registeredNode) checkSessionID(sessionID string) error {
type nodeStore struct {
periodChooser *periodChooser
gracePeriodMultiplier time.Duration
startupGracePeriod time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
}

func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLimitPeriod time.Duration) *nodeStore {
func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, startupGracePeriod, rateLimitPeriod time.Duration) *nodeStore {
return &nodeStore{
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplier: time.Duration(graceMultiplier),
startupGracePeriod: startupGracePeriod,
rateLimitPeriod: rateLimitPeriod,
}
}
Expand All @@ -79,7 +83,8 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
Node: n,
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
hb := s.periodChooser.Choose()*s.gracePeriodMultiplier + s.startupGracePeriod
rn.Heartbeat = heartbeat.New(hb, expireFunc)
return nil
}

Expand Down