Skip to content

Commit

Permalink
Merge pull request #1238 from aluzzardi/dispatcher-unknown-extra-grac…
Browse files Browse the repository at this point in the history
…eperiod

dispatcher: Give a couple of minutes for nodes to recover on re-election
  • Loading branch information
abronan committed Jul 26, 2016
2 parents 9d4c2f7 + 8b7de39 commit 5eca1b1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
11 changes: 7 additions & 4 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
defaultHeartBeatEpsilon = 500 * time.Millisecond
defaultGracePeriodMultiplier = 3
defaultRateLimitPeriod = 8 * time.Second
defaultStartupGracePeriod = 2 * time.Minute

// maxBatchItems is the threshold of queued writes that should
// trigger an actual transaction to commit them to the shared store.
Expand Down Expand Up @@ -61,9 +62,10 @@ var (
// DefautConfig.
type Config struct {
// Addr configures the address the dispatcher reports to agents.
Addr string
HeartbeatPeriod time.Duration
HeartbeatEpsilon time.Duration
Addr string
HeartbeatPeriod time.Duration
HeartbeatEpsilon time.Duration
StartupGracePeriod time.Duration
// RateLimitPeriod specifies how often node with same ID can try to register
// new session.
RateLimitPeriod time.Duration
Expand All @@ -75,6 +77,7 @@ func DefaultConfig() *Config {
return &Config{
HeartbeatPeriod: DefaultHeartBeatPeriod,
HeartbeatEpsilon: defaultHeartBeatEpsilon,
StartupGracePeriod: defaultStartupGracePeriod,
RateLimitPeriod: defaultRateLimitPeriod,
GracePeriodMultiplier: defaultGracePeriodMultiplier,
}
Expand Down Expand Up @@ -122,7 +125,7 @@ func (b weightedPeerByNodeID) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func New(cluster Cluster, c *Config) *Dispatcher {
return &Dispatcher{
addr: c.Addr,
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.StartupGracePeriod, c.RateLimitPeriod),
store: cluster.MemoryStore(),
cluster: cluster,
mgrQueue: watch.NewQueue(16),
Expand Down
2 changes: 2 additions & 0 deletions manager/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func TestHeartbeatTimeout(t *testing.T) {
cfg := DefaultConfig()
cfg.HeartbeatPeriod = 100 * time.Millisecond
cfg.HeartbeatEpsilon = 0
cfg.StartupGracePeriod = 100 * time.Millisecond
gd, err := startDispatcher(cfg)
assert.NoError(t, err)
defer gd.Close()
Expand Down Expand Up @@ -737,6 +738,7 @@ func TestNodesCount(t *testing.T) {
cfg := DefaultConfig()
cfg.HeartbeatPeriod = 100 * time.Millisecond
cfg.HeartbeatEpsilon = 0
cfg.StartupGracePeriod = 100 * time.Millisecond
gd, err := startDispatcher(cfg)
assert.NoError(t, err)
defer gd.Close()
Expand Down
11 changes: 8 additions & 3 deletions manager/dispatcher/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/docker/swarmkit/manager/dispatcher/heartbeat"
)

const rateLimitCount = 3
const (
rateLimitCount = 3
)

type registeredNode struct {
SessionID string
Expand Down Expand Up @@ -45,16 +47,18 @@ func (rn *registeredNode) checkSessionID(sessionID string) error {
type nodeStore struct {
periodChooser *periodChooser
gracePeriodMultiplier time.Duration
startupGracePeriod time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
}

func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLimitPeriod time.Duration) *nodeStore {
func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, startupGracePeriod, rateLimitPeriod time.Duration) *nodeStore {
return &nodeStore{
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplier: time.Duration(graceMultiplier),
startupGracePeriod: startupGracePeriod,
rateLimitPeriod: rateLimitPeriod,
}
}
Expand All @@ -79,7 +83,8 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
Node: n,
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
hb := s.periodChooser.Choose()*s.gracePeriodMultiplier + s.startupGracePeriod
rn.Heartbeat = heartbeat.New(hb, expireFunc)
return nil
}

Expand Down

0 comments on commit 5eca1b1

Please sign in to comment.