Skip to content

Commit

Permalink
Merge pull request #1460 from himanshug/leader_shutdown
Browse files Browse the repository at this point in the history
Leader to give up leadership lock on graceful shutdown
  • Loading branch information
k8s-ci-robot committed Dec 29, 2020
2 parents 631f52c + a70bb73 commit 6449707
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 20 deletions.
Expand Up @@ -262,21 +262,46 @@ private boolean tryAcquireOrRenew() {
return false;
}

if (log.isDebugEnabled()) {
log.debug("Lock not found, try to create it");
}

// No Lock resource exists, try to get leadership by creating it
return createLock(lock, leaderElectionRecord);
}

// alright, we have an existing lock resource
// 1. Is Lock Empty? --> try to get leadership by updating it
// 2. Am I the Leader? --> update info and renew lease by updating it
// 3. I am not the Leader?
// 3.1 is Lock expired? --> try to get leadership by updating it
// 3.2 Lock not expired? --> update info, try later

if (oldLeaderElectionRecord == null
|| oldLeaderElectionRecord.getAcquireTime() == null
|| oldLeaderElectionRecord.getRenewTime() == null
|| oldLeaderElectionRecord.getHolderIdentity() == null) {
return createLock(lock, leaderElectionRecord);
// We found the lock resource with an empty LeaderElectionRecord, try to get leadership by
// updating it
if (log.isDebugEnabled()) {
log.debug("Update lock to get lease");
}

if (oldLeaderElectionRecord != null) {
// maintain the leaderTransitions
leaderElectionRecord.setLeaderTransitions(
oldLeaderElectionRecord.getLeaderTransitions() + 1);
}

return updateLock(lock, leaderElectionRecord);
}

// 2. Record obtained, check the Identity & Time
// 2. Record obtained with LeaderElectionRecord, check the Identity & Time
if (!oldLeaderElectionRecord.equals(this.observedRecord)) {
this.observedRecord = oldLeaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
}

if (observedTimeMilliSeconds + config.getLeaseDuration().toMillis() > now.getTime()
&& !isLeader()) {
if (log.isDebugEnabled()) {
Expand All @@ -296,26 +321,20 @@ private boolean tryAcquireOrRenew() {
leaderElectionRecord.setLeaderTransitions(oldLeaderElectionRecord.getLeaderTransitions() + 1);
}

// update the lock itself
if (log.isDebugEnabled()) {
log.debug("Update lock acquire time to keep lease");
log.debug("Update lock to renew lease");
}
boolean updateSuccess = config.getLock().update(leaderElectionRecord);
if (!updateSuccess) {
return false;
}
this.observedRecord = leaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
if (log.isDebugEnabled()) {

boolean renewalStatus = updateLock(lock, leaderElectionRecord);

if (renewalStatus && log.isDebugEnabled()) {
log.debug("TryAcquireOrRenew return success");
}
return true;

return renewalStatus;
}

private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord) {
if (log.isDebugEnabled()) {
log.debug("Lock not found, try to create it");
}
boolean createSuccess = lock.create(leaderElectionRecord);
if (!createSuccess) {
return false;
Expand All @@ -325,6 +344,16 @@ private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord)
return true;
}

private boolean updateLock(Lock lock, LeaderElectionRecord leaderElectionRecord) {
boolean updateSuccess = lock.update(leaderElectionRecord);
if (!updateSuccess) {
return false;
}
this.observedRecord = leaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
return true;
}

private boolean isLeader() {
return this.config.getLock().identity().equals(this.observedRecord.getHolderIdentity());
}
Expand All @@ -345,8 +374,53 @@ private void maybeReportTransition() {

@Override
public void close() {
log.info("Closing...");
scheduledWorkers.shutdownNow();
leaseWorkers.shutdownNow();
hookWorkers.shutdownNow();

// If I am the leader, free the lock so that other candidates can take it immediately
if (observedRecord != null && isLeader()) {

// First ensure that all executors have stopped
try {
boolean isTerminated =
scheduledWorkers.awaitTermination(
config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("scheduledWorkers executor termination didn't finish.");
return;
}

isTerminated =
leaseWorkers.awaitTermination(config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("leaseWorkers executor termination didn't finish.");
return;
}

isTerminated =
hookWorkers.awaitTermination(config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("hookWorkers executor termination didn't finish.");
return;
}
} catch (InterruptedException ex) {
log.warn("Failed to ensure executors termination.", ex);
return;
}

log.info("Giving up the lock....");
LeaderElectionRecord emptyRecord = new LeaderElectionRecord();
// maintain leaderTransitions count
emptyRecord.setLeaderTransitions(observedRecord.getLeaderTransitions());
boolean status = this.config.getLock().update(emptyRecord);

if (!status) {
log.warn("Failed to give up the lock.");
}
}

log.info("Closed");
}
}
Expand Up @@ -59,6 +59,8 @@ public ConfigMapLock(String namespace, String name, String identity, ApiClient a
@Override
public LeaderElectionRecord get() throws ApiException {
V1ConfigMap configMap = coreV1Client.readNamespacedConfigMap(name, namespace, null, null, null);
configMapRefer.set(configMap);

Map<String, String> annotations = configMap.getMetadata().getAnnotations();
if (annotations == null || annotations.isEmpty()) {
configMap.getMetadata().setAnnotations(new HashMap<>());
Expand All @@ -74,7 +76,7 @@ public LeaderElectionRecord get() throws ApiException {
.getApiClient()
.getJSON()
.deserialize(recordRawStringContent, LeaderElectionRecord.class);
configMapRefer.set(configMap);

return record;
}

Expand Down
Expand Up @@ -59,6 +59,8 @@ public EndpointsLock(String namespace, String name, String identity, ApiClient a
@Override
public LeaderElectionRecord get() throws ApiException {
V1Endpoints endpoints = coreV1Client.readNamespacedEndpoints(name, namespace, null, null, null);
endpointsRefer.set(endpoints);

Map<String, String> annotations = endpoints.getMetadata().getAnnotations();
if (annotations == null || annotations.isEmpty()) {
endpoints.getMetadata().setAnnotations(new HashMap<>());
Expand All @@ -74,7 +76,6 @@ public LeaderElectionRecord get() throws ApiException {
.getApiClient()
.getJSON()
.deserialize(recordRawStringContent, LeaderElectionRecord.class);
endpointsRefer.set(endpoints);
return record;
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.kubernetes.client.extended.leaderelection.LeaderElector;
import io.kubernetes.client.extended.leaderelection.Lock;
import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
Expand Down Expand Up @@ -56,8 +57,10 @@ public void testLeaderElectingController() throws ApiException {
record.set(new LeaderElectionRecord());

when(mockLock.identity()).thenReturn("foo");

doAnswer(invocationOnMock -> record.get()).when(mockLock).get();
when(mockLock.get())
.thenThrow(
new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null))
.thenReturn(record.get());

doAnswer(
invocationOnMock -> {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.*;

import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -412,9 +413,12 @@ public MockResourceLock(String iden) {
}

@Override
public LeaderElectionRecord get() {
public LeaderElectionRecord get() throws ApiException {
lock.lock();
try {
if (leaderRecord == null) {
throw new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null);
}
return leaderRecord;
} finally {
lock.unlock();
Expand Down
@@ -0,0 +1,80 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.extended.leaderelection;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

/** Leader Election tests using "simulated" locks created by {@link LockSmith} */
public class LeaderElectorTest {
/**
* Tests that when a leader candidate is stopped gracefully, second candidate immediately becomes
* leader.
*/
@Test(timeout = 20000L)
public void testLeaderGracefulShutdown() throws Exception {
LockSmith lockSmith = new LockSmith();

CountDownLatch startBeingLeader1 = new CountDownLatch(1);
CountDownLatch stopBeingLeader1 = new CountDownLatch(1);

LeaderElector leaderElector1 =
makeAndRunLeaderElectorAsync(lockSmith, "candidate1", startBeingLeader1, stopBeingLeader1);

// wait for candidate1 to become leader
startBeingLeader1.await();

CountDownLatch startBeingLeader2 = new CountDownLatch(1);
CountDownLatch stopBeingLeader2 = new CountDownLatch(1);

LeaderElector leaderElector2 =
makeAndRunLeaderElectorAsync(lockSmith, "candidate2", startBeingLeader2, stopBeingLeader2);

leaderElector1.close();

// ensure stopBeingLeader hook is called
stopBeingLeader1.await();

// wait for candidate2 to become leader
startBeingLeader2.await();

leaderElector2.close();
}

private LeaderElector makeAndRunLeaderElectorAsync(
LockSmith lockSmith,
String lockIdentity,
CountDownLatch startBeingLeader,
CountDownLatch stopBeingLeader) {
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(
lockSmith.makeLock(lockIdentity),
Duration.ofMillis(TimeUnit.MINUTES.toMillis(1)),
Duration.ofMillis(TimeUnit.SECONDS.toMillis(51)),
Duration.ofMillis(TimeUnit.SECONDS.toMillis(3)));
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);

Thread thread =
new Thread(
() ->
leaderElector.run(
() -> startBeingLeader.countDown(), () -> stopBeingLeader.countDown()),
String.format("%s-leader-elector-main", lockIdentity));
thread.setDaemon(true);
thread.start();

return leaderElector;
}
}
@@ -0,0 +1,84 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.extended.leaderelection;

import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.util.concurrent.atomic.AtomicReference;

/** Makes simulated {@link Lock} objects that behave as if they were backed by real API server. */
public class LockSmith {
private AtomicReference<Resource> lockResourceRef = new AtomicReference<>();

public Lock makeLock(String identity) {
return new SimulatedLock(identity);
}

private class SimulatedLock implements Lock {
private final String identity;

public SimulatedLock(String identity) {
this.identity = identity;
}

@Override
public LeaderElectionRecord get() throws ApiException {
if (lockResourceRef.get() == null) {
throw new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null);
}

return lockResourceRef.get().record;
}

@Override
public boolean create(LeaderElectionRecord record) {
return lockResourceRef.compareAndSet(null, new Resource(record));
}

@Override
public boolean update(LeaderElectionRecord record) {
Resource res = lockResourceRef.get();
if (res == null) {
return false;
} else {
Resource newResource = new Resource(res.version + 1, record);
return lockResourceRef.compareAndSet(res, newResource);
}
}

@Override
public String identity() {
return identity;
}

@Override
public String describe() {
return "simulated/lock";
}
}

private static class Resource {
final int version;
final LeaderElectionRecord record;

public Resource(LeaderElectionRecord record) {
this.version = 0;
this.record = record;
}

public Resource(int version, LeaderElectionRecord record) {
this.version = version;
this.record = record;
}
}
}

0 comments on commit 6449707

Please sign in to comment.