Skip to content

Commit

Permalink
Merge pull request moby#2112 from ctelfer/graceful-lbrm
Browse files Browse the repository at this point in the history
Gracefully remove LB endpoints from services
  • Loading branch information
Flavio Crisciani committed Mar 19, 2018
2 parents aa61221 + 7d7412f commit e21dab8
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 106 deletions.
96 changes: 78 additions & 18 deletions libnetwork/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,15 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
}

buf, err := proto.Marshal(&EndpointRecord{
Name: name,
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
IngressPorts: ingressPorts,
Aliases: ep.svcAliases,
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
Name: name,
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
IngressPorts: ingressPorts,
Aliases: ep.svcAliases,
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
ServiceDisabled: false,
})
if err != nil {
return err
Expand All @@ -663,7 +664,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, fullRemove bool, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -677,6 +678,15 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

// Avoid a race w/ with a container that aborts preemptively. This would
// get caught in disableServceInNetworkDB, but we check here to make the
// nature of the condition more clear.
// See comment in addServiceInfoToCluster()
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}

c := n.getController()
agent := c.getAgent()

Expand All @@ -686,9 +696,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
}

if agent != nil {
// First delete from networkDB then locally
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
// First update the networkDB then locally
if fullRemove {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
}
} else {
disableServiceInNetworkDB(agent, n, ep)
}
}

Expand All @@ -699,7 +713,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
return err
}
} else {
Expand All @@ -715,6 +729,35 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
return nil
}

func disableServiceInNetworkDB(a *agent, n *network, ep *endpoint) {
var epRec EndpointRecord

logrus.Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())

// Update existing record to indicate that the service is disabled
inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
if err != nil {
logrus.Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Should never fail
if err := proto.Unmarshal(inBuf, &epRec); err != nil {
logrus.Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
return
}
epRec.ServiceDisabled = true
// Should never fail
outBuf, err := proto.Marshal(&epRec)
if err != nil {
logrus.Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Send update to the whole cluster
if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
logrus.Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}

func (n *network) addDriverWatches() {
if !n.isClusterEligible() {
return
Expand Down Expand Up @@ -844,7 +887,6 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid string
eid string
value []byte
isAdd bool
epRec EndpointRecord
)

Expand All @@ -853,12 +895,15 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid = event.NetworkID
eid = event.Key
value = event.Value
isAdd = true
case networkdb.DeleteEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
case networkdb.UpdateEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
default:
logrus.Errorf("Unexpected update service table event = %#v", event)
return
}
Expand All @@ -883,7 +928,8 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
return
}

if isAdd {
switch ev.(type) {
case networkdb.CreateEvent:
logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
Expand All @@ -897,11 +943,12 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
} else {

case networkdb.DeleteEvent:
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
Expand All @@ -911,5 +958,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.UpdateEvent:
logrus.Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
logrus.Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logrus.Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
}
}
123 changes: 75 additions & 48 deletions libnetwork/agent.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions libnetwork/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message EndpointRecord {

// List of aliases task specific aliases
repeated string task_aliases = 8;

// Whether this enpoint's service has been disabled
bool service_disabled = 9;
}

// PortConfig specifies an exposed port which can be
Expand Down

0 comments on commit e21dab8

Please sign in to comment.