From 9e08762abd38674e87ff7cca8652dca3efea5a08 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 29 Dec 2020 06:56:21 -0800 Subject: [PATCH] leaderelection sanity e2e integration tests --- e2e/pom.xml | 36 +- .../leaderelection/LeaderElectorTest.java | 337 ++++++++++++++++++ .../leaderelection/LeaderElector.java | 3 + .../resourcelock/LeaseLock.java | 8 +- 4 files changed, 368 insertions(+), 16 deletions(-) create mode 100644 e2e/src/test/java/io/kubernetes/client/e2e/extended/leaderelection/LeaderElectorTest.java diff --git a/e2e/pom.xml b/e2e/pom.xml index d56554e749..510fd468b9 100644 --- a/e2e/pom.xml +++ b/e2e/pom.xml @@ -14,6 +14,9 @@ ../pom.xml + + 2.0-M4-groovy-3.0 + io.kubernetes @@ -29,7 +32,27 @@ org.spockframework spock-core - 2.0-M4-groovy-3.0 + ${spock.version} + test + + + junit + junit + test + + + + + + org.junit.vintage + junit-vintage-engine + 5.7.0 + test + + + org.spockframework + spock-junit4 + ${spock.version} test @@ -51,19 +74,8 @@ org.codehaus.gmavenplus gmavenplus-plugin - - - - ${project.basedir}/src/test/groovy - - **/*.groovy - - - - - compileTests diff --git a/e2e/src/test/java/io/kubernetes/client/e2e/extended/leaderelection/LeaderElectorTest.java b/e2e/src/test/java/io/kubernetes/client/e2e/extended/leaderelection/LeaderElectorTest.java new file mode 100644 index 0000000000..e2e3263b77 --- /dev/null +++ b/e2e/src/test/java/io/kubernetes/client/e2e/extended/leaderelection/LeaderElectorTest.java @@ -0,0 +1,337 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.e2e.extended.leaderelection; + +import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.kubernetes.client.extended.leaderelection.LeaderElector; +import io.kubernetes.client.extended.leaderelection.Lock; +import io.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock; +import io.kubernetes.client.extended.leaderelection.resourcelock.EndpointsLock; +import io.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoordinationV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.util.ClientBuilder; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.joda.time.format.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class LeaderElectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(LeaderElectorTest.class); + + private static final String LOCK_RESOURCE_NAME = "leader-election-it"; + private static final String NAMESPACE = "default"; + + private enum LockType { + ConfigMap, + Endpoints, + Lease + } + + @Parameterized.Parameters(name = "{0}") + public static Collection constructorFeeder() { + final List args = new ArrayList<>(); + + args.add(new Object[] {LockType.ConfigMap}); + args.add(new Object[] {LockType.Endpoints}); + args.add(new Object[] {LockType.Lease}); + + return args; + } + + private final ApiClient apiClient; + private final LockType lockType; + + public LeaderElectorTest(LockType lockType) { + try { + apiClient = ClientBuilder.defaultClient(); + } catch (IOException ex) { + throw new RuntimeException("Couldn't create ApiClient", ex); + } + this.lockType = lockType; + + // Lease resource requires special care with DateTime + if (lockType == LockType.Lease) { + // TODO: switch date-time library so that micro-sec timestamp can be serialized in RFC3339 + // format w/ correct precision without the hacks + + // This formatter is used for Lease resource spec's acquire/renewTime + DateTimeFormatter isoWithFractionalMicroSecsFormatter = + DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"); + + DateTimeFormatter formatter = + new DateTimeFormatterBuilder() + .append( + isoWithFractionalMicroSecsFormatter.getPrinter(), + new DateTimeParser[] { + isoWithFractionalMicroSecsFormatter.getParser(), + // need this one to parse "creationTimestamp" format e.g. "2020-12-30T09:29:13Z" + // in Lease resource returned from server + ISODateTimeFormat.dateOptionalTimeParser().getParser(), + }) + .toFormatter(); + + apiClient.setDateTimeFormat(formatter); + } + } + + @Before + public void setup() throws Exception { + // delete the lock resource if it exists, or else first leader candidate might need to wait for + // a whole + // leaseDuration configured + switch (lockType) { + case ConfigMap: + deleteConfigMapLockResource(); + break; + case Endpoints: + deleteEndpointsLockResource(); + break; + case Lease: + deleteLeaseLockResource(); + break; + default: + throw new RuntimeException("Unknown LockType " + lockType); + } + } + + @Test(timeout = 30000L) + public void testSingleCandidateLeaderElection() throws Exception { + CountDownLatch startLeadershipLatch = new CountDownLatch(1); + CountDownLatch stopLeadershipLatch = new CountDownLatch(1); + + LeaderElector leaderElector = + makeAndRunLeaderElectorAsync( + "candidate", + null, + () -> startLeadershipLatch.countDown(), + () -> stopLeadershipLatch.countDown(), + apiClient); + + startLeadershipLatch.await(); + + leaderElector.close(); + + stopLeadershipLatch.await(); + } + + @Test(timeout = 30000L) + public void testMultiCandidateLeaderElection() throws Exception { + CyclicBarrier startBarrier = new CyclicBarrier(2); + + CountDownLatch startBeingLeader = new CountDownLatch(1); + CountDownLatch stopBeingLeader = new CountDownLatch(1); + + AtomicInteger startBeingLeaderCount = new AtomicInteger(); + AtomicInteger stopBeingLeaderCount = new AtomicInteger(); + + AtomicReference leaderRef = new AtomicReference<>(); + + String candidate1 = "candidate1"; + String candidate2 = "candidate2"; + + LeaderElector leaderElector1 = + makeAndRunLeaderElectorAsync( + candidate1, + startBarrier, + () -> { + startBeingLeaderCount.incrementAndGet(); + leaderRef.set(candidate1); + startBeingLeader.countDown(); + }, + () -> { + stopBeingLeaderCount.incrementAndGet(); + stopBeingLeader.countDown(); + }, + apiClient); + + LeaderElector leaderElector2 = + makeAndRunLeaderElectorAsync( + candidate2, + startBarrier, + () -> { + startBeingLeaderCount.incrementAndGet(); + leaderRef.set(candidate2); + startBeingLeader.countDown(); + }, + () -> { + stopBeingLeaderCount.incrementAndGet(); + stopBeingLeader.countDown(); + }, + apiClient); + + // wait till someone becomes leader + startBeingLeader.await(); + Assert.assertNotNull(leaderRef.get()); + Assert.assertTrue(candidate1.equals(leaderRef.get()) || candidate2.equals(leaderRef.get())); + + // stop both LeaderElectors, in order .. non-leader, then leader so that non-leader doesn't get + // to become leader + if (candidate1.equals(leaderRef.get())) { + leaderElector2.close(); + leaderElector1.close(); + } else { + leaderElector1.close(); + leaderElector2.close(); + } + + stopBeingLeader.await(); + + // make sure that only one candidate became leader + Assert.assertEquals(1, startBeingLeaderCount.get()); + Assert.assertEquals(1, stopBeingLeaderCount.get()); + } + + @Test(timeout = 30000L) + public void testLeaderGracefulShutdown() throws Exception { + CountDownLatch startBeingLeader1 = new CountDownLatch(1); + CountDownLatch stopBeingLeader1 = new CountDownLatch(1); + + LeaderElector leaderElector1 = + makeAndRunLeaderElectorAsync( + "candidate1", + null, + () -> startBeingLeader1.countDown(), + () -> stopBeingLeader1.countDown(), + apiClient); + + // wait for candidate1 to become leader + startBeingLeader1.await(); + + CountDownLatch startBeingLeader2 = new CountDownLatch(1); + CountDownLatch stopBeingLeader2 = new CountDownLatch(1); + + LeaderElector leaderElector2 = + makeAndRunLeaderElectorAsync( + "candidate2", + null, + () -> startBeingLeader2.countDown(), + () -> stopBeingLeader2.countDown(), + apiClient); + + leaderElector1.close(); + + // ensure stopBeingLeader hook is called + stopBeingLeader1.await(); + + // wait for candidate2 to become leader + startBeingLeader2.await(); + + leaderElector2.close(); + } + + private LeaderElector makeAndRunLeaderElectorAsync( + String candidateId, + CyclicBarrier startBarrier, + Runnable startBeingLeader, + Runnable stopBeingLeader, + ApiClient apiClient) { + + Lock lock = makeLock(candidateId, NAMESPACE, LOCK_RESOURCE_NAME, apiClient); + + LeaderElectionConfig leaderElectionConfig = + new LeaderElectionConfig( + lock, Duration.ofSeconds(30), Duration.ofSeconds(23), Duration.ofSeconds(3)); + LeaderElector leaderElector = new LeaderElector(leaderElectionConfig); + + Thread thread = + new Thread( + () -> { + if (startBarrier != null) { + try { + startBarrier.await(); + } catch (InterruptedException | BrokenBarrierException ex) { + LOGGER.error("startBarrier.await() failed", ex); + return; + } + } + + leaderElector.run(startBeingLeader, stopBeingLeader); + }, + String.format("%s-leader-elector-main", candidateId)); + thread.setDaemon(true); + thread.start(); + + return leaderElector; + } + + private Lock makeLock( + String candidateId, String namespace, String lockResourceName, ApiClient k8sApiClient) { + + switch (lockType) { + case ConfigMap: + return new ConfigMapLock(namespace, lockResourceName, candidateId, k8sApiClient); + case Endpoints: + return new EndpointsLock(namespace, lockResourceName, candidateId, k8sApiClient); + case Lease: + return new LeaseLock(namespace, lockResourceName, candidateId, k8sApiClient); + default: + throw new RuntimeException("Unknown LockType " + lockType); + } + } + + private void deleteConfigMapLockResource() throws Exception { + try { + CoreV1Api coreV1Api = new CoreV1Api(apiClient); + coreV1Api.deleteNamespacedConfigMap( + LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null); + } catch (ApiException ex) { + if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + } + } + + private void deleteEndpointsLockResource() throws Exception { + try { + CoreV1Api coreV1Api = new CoreV1Api(apiClient); + coreV1Api.deleteNamespacedEndpoints( + LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null); + } catch (ApiException ex) { + if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + } + } + + private void deleteLeaseLockResource() throws Exception { + try { + CoordinationV1Api coordinationV1Api = new CoordinationV1Api(apiClient); + coordinationV1Api.deleteNamespacedLease( + LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null); + } catch (ApiException ex) { + if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + } + } +} diff --git a/extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java b/extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java index 20cf090c52..ccafc65df6 100644 --- a/extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java +++ b/extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java @@ -414,6 +414,9 @@ public void close() { LeaderElectionRecord emptyRecord = new LeaderElectionRecord(); // maintain leaderTransitions count emptyRecord.setLeaderTransitions(observedRecord.getLeaderTransitions()); + // LeaseLock impl requires a non-zero value for leaseDuration + emptyRecord.setLeaseDurationSeconds( + Long.valueOf(config.getLeaseDuration().getSeconds()).intValue()); boolean status = this.config.getLock().update(emptyRecord); if (!status) { diff --git a/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock/LeaseLock.java b/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock/LeaseLock.java index 3a7b74d2d9..b1f7486c0e 100644 --- a/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock/LeaseLock.java +++ b/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock/LeaseLock.java @@ -80,9 +80,9 @@ public boolean create(LeaderElectionRecord record) { return true; } catch (ApiException e) { if (e.getCode() == HttpURLConnection.HTTP_CONFLICT) { - log.debug("received {} when creating configmap lock", e.getCode(), e); + log.debug("received {} when creating lease lock", e.getCode(), e); } else { - log.error("received {} when creating configmap lock", e.getCode(), e); + log.error("received {} when creating lease lock", e.getCode(), e); } return false; } @@ -99,9 +99,9 @@ public boolean update(LeaderElectionRecord record) { return true; } catch (ApiException e) { if (e.getCode() == HttpURLConnection.HTTP_CONFLICT) { - log.debug("received {} when creating configmap lock", e.getCode(), e); + log.debug("received {} when updating lease lock", e.getCode(), e); } else { - log.error("received {} when creating configmap lock", e.getCode(), e); + log.error("received {} when updating lease lock", e.getCode(), e); } return false; }