Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader to give up leadership lock on graceful shutdown #1460

Merged
merged 4 commits into from Dec 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -262,21 +262,46 @@ private boolean tryAcquireOrRenew() {
return false;
}

if (log.isDebugEnabled()) {
log.debug("Lock not found, try to create it");
}

// No Lock resource exists, try to get leadership by creating it
return createLock(lock, leaderElectionRecord);
}

// alright, we have an existing lock resource
// 1. Is Lock Empty? --> try to get leadership by updating it
// 2. Am I the Leader? --> update info and renew lease by updating it
// 3. I am not the Leader?
// 3.1 is Lock expired? --> try to get leadership by updating it
// 3.2 Lock not expired? --> update info, try later

if (oldLeaderElectionRecord == null
|| oldLeaderElectionRecord.getAcquireTime() == null
|| oldLeaderElectionRecord.getRenewTime() == null
|| oldLeaderElectionRecord.getHolderIdentity() == null) {
return createLock(lock, leaderElectionRecord);
// We found the lock resource with an empty LeaderElectionRecord, try to get leadership by
// updating it
if (log.isDebugEnabled()) {
log.debug("Update lock to get lease");
}

if (oldLeaderElectionRecord != null) {
// maintain the leaderTransitions
leaderElectionRecord.setLeaderTransitions(
oldLeaderElectionRecord.getLeaderTransitions() + 1);
}

return updateLock(lock, leaderElectionRecord);
}

// 2. Record obtained, check the Identity & Time
// 2. Record obtained with LeaderElectionRecord, check the Identity & Time
if (!oldLeaderElectionRecord.equals(this.observedRecord)) {
this.observedRecord = oldLeaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
}

if (observedTimeMilliSeconds + config.getLeaseDuration().toMillis() > now.getTime()
&& !isLeader()) {
if (log.isDebugEnabled()) {
Expand All @@ -296,26 +321,20 @@ private boolean tryAcquireOrRenew() {
leaderElectionRecord.setLeaderTransitions(oldLeaderElectionRecord.getLeaderTransitions() + 1);
}

// update the lock itself
if (log.isDebugEnabled()) {
log.debug("Update lock acquire time to keep lease");
log.debug("Update lock to renew lease");
}
boolean updateSuccess = config.getLock().update(leaderElectionRecord);
if (!updateSuccess) {
return false;
}
this.observedRecord = leaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
if (log.isDebugEnabled()) {

boolean renewalStatus = updateLock(lock, leaderElectionRecord);

if (renewalStatus && log.isDebugEnabled()) {
log.debug("TryAcquireOrRenew return success");
}
return true;

return renewalStatus;
}

private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord) {
if (log.isDebugEnabled()) {
log.debug("Lock not found, try to create it");
}
boolean createSuccess = lock.create(leaderElectionRecord);
if (!createSuccess) {
return false;
Expand All @@ -325,6 +344,16 @@ private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord)
return true;
}

private boolean updateLock(Lock lock, LeaderElectionRecord leaderElectionRecord) {
boolean updateSuccess = lock.update(leaderElectionRecord);
if (!updateSuccess) {
return false;
}
this.observedRecord = leaderElectionRecord;
this.observedTimeMilliSeconds = System.currentTimeMillis();
return true;
}

private boolean isLeader() {
return this.config.getLock().identity().equals(this.observedRecord.getHolderIdentity());
}
Expand All @@ -345,8 +374,53 @@ private void maybeReportTransition() {

@Override
public void close() {
log.info("Closing...");
scheduledWorkers.shutdownNow();
leaseWorkers.shutdownNow();
hookWorkers.shutdownNow();

// If I am the leader, free the lock so that other candidates can take it immediately
if (observedRecord != null && isLeader()) {

// First ensure that all executors have stopped
try {
boolean isTerminated =
scheduledWorkers.awaitTermination(
config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("scheduledWorkers executor termination didn't finish.");
return;
}

isTerminated =
leaseWorkers.awaitTermination(config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("leaseWorkers executor termination didn't finish.");
return;
}

isTerminated =
hookWorkers.awaitTermination(config.getRetryPeriod().getSeconds(), TimeUnit.SECONDS);
if (!isTerminated) {
log.warn("hookWorkers executor termination didn't finish.");
return;
}
} catch (InterruptedException ex) {
log.warn("Failed to ensure executors termination.", ex);
return;
}

log.info("Giving up the lock....");
LeaderElectionRecord emptyRecord = new LeaderElectionRecord();
// maintain leaderTransitions count
emptyRecord.setLeaderTransitions(observedRecord.getLeaderTransitions());
boolean status = this.config.getLock().update(emptyRecord);

if (!status) {
log.warn("Failed to give up the lock.");
}
}

log.info("Closed");
}
}
Expand Up @@ -59,6 +59,8 @@ public ConfigMapLock(String namespace, String name, String identity, ApiClient a
@Override
public LeaderElectionRecord get() throws ApiException {
V1ConfigMap configMap = coreV1Client.readNamespacedConfigMap(name, namespace, null, null, null);
configMapRefer.set(configMap);

Map<String, String> annotations = configMap.getMetadata().getAnnotations();
if (annotations == null || annotations.isEmpty()) {
configMap.getMetadata().setAnnotations(new HashMap<>());
Expand All @@ -74,7 +76,7 @@ public LeaderElectionRecord get() throws ApiException {
.getApiClient()
.getJSON()
.deserialize(recordRawStringContent, LeaderElectionRecord.class);
configMapRefer.set(configMap);

return record;
}

Expand Down
Expand Up @@ -59,6 +59,8 @@ public EndpointsLock(String namespace, String name, String identity, ApiClient a
@Override
public LeaderElectionRecord get() throws ApiException {
V1Endpoints endpoints = coreV1Client.readNamespacedEndpoints(name, namespace, null, null, null);
endpointsRefer.set(endpoints);

Map<String, String> annotations = endpoints.getMetadata().getAnnotations();
if (annotations == null || annotations.isEmpty()) {
endpoints.getMetadata().setAnnotations(new HashMap<>());
Expand All @@ -74,7 +76,6 @@ public LeaderElectionRecord get() throws ApiException {
.getApiClient()
.getJSON()
.deserialize(recordRawStringContent, LeaderElectionRecord.class);
endpointsRefer.set(endpoints);
return record;
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.kubernetes.client.extended.leaderelection.LeaderElector;
import io.kubernetes.client.extended.leaderelection.Lock;
import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
Expand Down Expand Up @@ -56,8 +57,10 @@ public void testLeaderElectingController() throws ApiException {
record.set(new LeaderElectionRecord());

when(mockLock.identity()).thenReturn("foo");

doAnswer(invocationOnMock -> record.get()).when(mockLock).get();
when(mockLock.get())
.thenThrow(
new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null))
.thenReturn(record.get());

doAnswer(
invocationOnMock -> {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.*;

import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -412,9 +413,12 @@ public MockResourceLock(String iden) {
}

@Override
public LeaderElectionRecord get() {
public LeaderElectionRecord get() throws ApiException {
lock.lock();
try {
if (leaderRecord == null) {
throw new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null);
}
return leaderRecord;
} finally {
lock.unlock();
Expand Down
@@ -0,0 +1,80 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.extended.leaderelection;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

/** Leader Election tests using "simulated" locks created by {@link LockSmith} */
public class LeaderElectorTest {
/**
* Tests that when a leader candidate is stopped gracefully, second candidate immediately becomes
* leader.
*/
@Test(timeout = 20000L)
public void testLeaderGracefulShutdown() throws Exception {
LockSmith lockSmith = new LockSmith();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there is value in using WireMock and actually talking HTTP for the locks. How hard is it to switch to testing using WireMock like we do elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

Here we are more interested in testing the code of LeaderElector which can be tested as long as a well-behaving Lock implementation is provided. Also, truth is that it was just super simple/quick to do and does the job :) ... and followed in spirit the existing unit tests for leader election related code.

With WireMock, we would actually also test the implementation of Lock itself i.e. ConfigMapLock class etc. I haven't used WireMock in the past, so not sure about the work required, but will try to look into it.

that said, not sure but, I see there is an e2e build that appears to setup a real k8s cluster, how would you feel about this test running as an e2e integration test against real k8s cluster instead of the WireMock ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e2e integration is even better :)


CountDownLatch startBeingLeader1 = new CountDownLatch(1);
CountDownLatch stopBeingLeader1 = new CountDownLatch(1);

LeaderElector leaderElector1 =
makeAndRunLeaderElectorAsync(lockSmith, "candidate1", startBeingLeader1, stopBeingLeader1);

// wait for candidate1 to become leader
startBeingLeader1.await();

CountDownLatch startBeingLeader2 = new CountDownLatch(1);
CountDownLatch stopBeingLeader2 = new CountDownLatch(1);

LeaderElector leaderElector2 =
makeAndRunLeaderElectorAsync(lockSmith, "candidate2", startBeingLeader2, stopBeingLeader2);

leaderElector1.close();

// ensure stopBeingLeader hook is called
stopBeingLeader1.await();

// wait for candidate2 to become leader
startBeingLeader2.await();

leaderElector2.close();
}

private LeaderElector makeAndRunLeaderElectorAsync(
LockSmith lockSmith,
String lockIdentity,
CountDownLatch startBeingLeader,
CountDownLatch stopBeingLeader) {
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(
lockSmith.makeLock(lockIdentity),
Duration.ofMillis(TimeUnit.MINUTES.toMillis(1)),
Duration.ofMillis(TimeUnit.SECONDS.toMillis(51)),
Duration.ofMillis(TimeUnit.SECONDS.toMillis(3)));
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);

Thread thread =
new Thread(
() ->
leaderElector.run(
() -> startBeingLeader.countDown(), () -> stopBeingLeader.countDown()),
String.format("%s-leader-elector-main", lockIdentity));
thread.setDaemon(true);
thread.start();

return leaderElector;
}
}
@@ -0,0 +1,84 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.extended.leaderelection;

import io.kubernetes.client.openapi.ApiException;
import java.net.HttpURLConnection;
import java.util.concurrent.atomic.AtomicReference;

/** Makes simulated {@link Lock} objects that behave as if they were backed by real API server. */
public class LockSmith {
private AtomicReference<Resource> lockResourceRef = new AtomicReference<>();

public Lock makeLock(String identity) {
return new SimulatedLock(identity);
}

private class SimulatedLock implements Lock {
private final String identity;

public SimulatedLock(String identity) {
this.identity = identity;
}

@Override
public LeaderElectionRecord get() throws ApiException {
if (lockResourceRef.get() == null) {
throw new ApiException("Record Not Found", HttpURLConnection.HTTP_NOT_FOUND, null, null);
}

return lockResourceRef.get().record;
}

@Override
public boolean create(LeaderElectionRecord record) {
return lockResourceRef.compareAndSet(null, new Resource(record));
}

@Override
public boolean update(LeaderElectionRecord record) {
Resource res = lockResourceRef.get();
if (res == null) {
return false;
} else {
Resource newResource = new Resource(res.version + 1, record);
return lockResourceRef.compareAndSet(res, newResource);
}
}

@Override
public String identity() {
return identity;
}

@Override
public String describe() {
return "simulated/lock";
}
}

private static class Resource {
final int version;
final LeaderElectionRecord record;

public Resource(LeaderElectionRecord record) {
this.version = 0;
this.record = record;
}

public Resource(int version, LeaderElectionRecord record) {
this.version = version;
this.record = record;
}
}
}