Skip to content

Commit

Permalink
fix formatting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshug committed Dec 30, 2020
1 parent 0beea47 commit c226e6b
Showing 1 changed file with 62 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@
import io.kubernetes.client.openapi.apis.CoordinationV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.ClientBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.time.Duration;
Expand All @@ -40,9 +32,15 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class LeaderElectorTest {
Expand All @@ -53,11 +51,13 @@ public class LeaderElectorTest {
private static final String NAMESPACE = "default";

private enum LockType {
ConfigMap, Endpoints, Lease
ConfigMap,
Endpoints,
Lease
}

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
public static Collection<Object[]> constructorFeeder() {
final List<Object[]> args = new ArrayList<>();

args.add(new Object[] {LockType.ConfigMap});
Expand All @@ -81,7 +81,8 @@ public LeaderElectorTest(LockType lockType) {

@Before
public void setup() throws Exception {
// delete the lock resource if it exists, or else first leader candidate might need to wait for a whole
// delete the lock resource if it exists, or else first leader candidate might need to wait for
// a whole
// leaseDuration configured
switch (lockType) {
case ConfigMap:
Expand All @@ -103,13 +104,13 @@ public void testSingleCandidateLeaderElection() throws Exception {
CountDownLatch startLeadershipLatch = new CountDownLatch(1);
CountDownLatch stopLeadershipLatch = new CountDownLatch(1);

LeaderElector leaderElector = makeAndRunLeaderElectorAsync(
LeaderElector leaderElector =
makeAndRunLeaderElectorAsync(
"candidate",
null,
() -> startLeadershipLatch.countDown(),
() -> stopLeadershipLatch.countDown(),
apiClient
);
apiClient);

startLeadershipLatch.await();

Expand All @@ -133,7 +134,8 @@ public void testMultiCandidateLeaderElection() throws Exception {
String candidate1 = "candidate1";
String candidate2 = "candidate2";

LeaderElector leaderElector1 = makeAndRunLeaderElectorAsync(
LeaderElector leaderElector1 =
makeAndRunLeaderElectorAsync(
candidate1,
startBarrier,
() -> {
Expand All @@ -145,10 +147,10 @@ public void testMultiCandidateLeaderElection() throws Exception {
stopBeingLeaderCount.incrementAndGet();
stopBeingLeader.countDown();
},
apiClient
);
apiClient);

LeaderElector leaderElector2 = makeAndRunLeaderElectorAsync(
LeaderElector leaderElector2 =
makeAndRunLeaderElectorAsync(
candidate2,
startBarrier,
() -> {
Expand All @@ -160,15 +162,15 @@ public void testMultiCandidateLeaderElection() throws Exception {
stopBeingLeaderCount.incrementAndGet();
stopBeingLeader.countDown();
},
apiClient
);
apiClient);

// wait till someone becomes leader
startBeingLeader.await();
Assert.assertNotNull(leaderRef.get());
Assert.assertTrue(candidate1.equals(leaderRef.get()) || candidate2.equals(leaderRef.get()));

// stop both LeaderElectors, in order .. non-leader, then leader so that non-leader doesn't get to become leader
// stop both LeaderElectors, in order .. non-leader, then leader so that non-leader doesn't get
// to become leader
if (candidate1.equals(leaderRef.get())) {
leaderElector2.close();
leaderElector1.close();
Expand All @@ -189,27 +191,27 @@ public void testLeaderGracefulShutdown() throws Exception {
CountDownLatch startBeingLeader1 = new CountDownLatch(1);
CountDownLatch stopBeingLeader1 = new CountDownLatch(1);

LeaderElector leaderElector1 = makeAndRunLeaderElectorAsync(
LeaderElector leaderElector1 =
makeAndRunLeaderElectorAsync(
"candidate1",
null,
() -> startBeingLeader1.countDown(),
() -> stopBeingLeader1.countDown(),
apiClient
);
apiClient);

// wait for candidate1 to become leader
startBeingLeader1.await();

CountDownLatch startBeingLeader2 = new CountDownLatch(1);
CountDownLatch stopBeingLeader2 = new CountDownLatch(1);

LeaderElector leaderElector2 = makeAndRunLeaderElectorAsync(
LeaderElector leaderElector2 =
makeAndRunLeaderElectorAsync(
"candidate2",
null,
() -> startBeingLeader2.countDown(),
() -> stopBeingLeader2.countDown(),
apiClient
);
apiClient);

leaderElector1.close();

Expand All @@ -223,71 +225,50 @@ public void testLeaderGracefulShutdown() throws Exception {
}

private LeaderElector makeAndRunLeaderElectorAsync(
String candidateId,
CyclicBarrier startBarrier,
Runnable startBeingLeader,
Runnable stopBeingLeader,
ApiClient apiClient) {
String candidateId,
CyclicBarrier startBarrier,
Runnable startBeingLeader,
Runnable stopBeingLeader,
ApiClient apiClient) {

Lock lock = makeLock(candidateId, NAMESPACE, LOCK_RESOURCE_NAME, apiClient);

LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(
lock,
Duration.ofSeconds(30),
Duration.ofSeconds(23),
Duration.ofSeconds(3));
new LeaderElectionConfig(
lock, Duration.ofSeconds(30), Duration.ofSeconds(23), Duration.ofSeconds(3));
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);

Thread thread =
new Thread(
() -> {
if (startBarrier != null) {
try {
startBarrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
LOGGER.error("startBarrier.await() failed", ex);
return;
}
}

leaderElector.run(startBeingLeader, stopBeingLeader);
},
String.format("%s-leader-elector-main", candidateId));
new Thread(
() -> {
if (startBarrier != null) {
try {
startBarrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
LOGGER.error("startBarrier.await() failed", ex);
return;
}
}

leaderElector.run(startBeingLeader, stopBeingLeader);
},
String.format("%s-leader-elector-main", candidateId));
thread.setDaemon(true);
thread.start();

return leaderElector;
}

private Lock makeLock(
String candidateId,
String namespace,
String lockResourceName,
ApiClient k8sApiClient) {
String candidateId, String namespace, String lockResourceName, ApiClient k8sApiClient) {

switch (lockType) {
case ConfigMap:
return new ConfigMapLock(
namespace,
lockResourceName,
candidateId,
k8sApiClient
);
return new ConfigMapLock(namespace, lockResourceName, candidateId, k8sApiClient);
case Endpoints:
return new EndpointsLock(
namespace,
lockResourceName,
candidateId,
k8sApiClient
);
return new EndpointsLock(namespace, lockResourceName, candidateId, k8sApiClient);
case Lease:
return new LeaseLock(
namespace,
lockResourceName,
candidateId,
k8sApiClient
);
return new LeaseLock(namespace, lockResourceName, candidateId, k8sApiClient);
default:
throw new RuntimeException("Unknown LockType " + lockType);
}
Expand All @@ -297,17 +278,8 @@ private void deleteConfigMapLockResource() throws Exception {
try {
CoreV1Api coreV1Api = new CoreV1Api(apiClient);
coreV1Api.deleteNamespacedConfigMap(
LOCK_RESOURCE_NAME,
NAMESPACE,
null,
null,
null,
null,
null,
null
);
}
catch (ApiException ex) {
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
} catch (ApiException ex) {
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
Expand All @@ -318,17 +290,8 @@ private void deleteEndpointsLockResource() throws Exception {
try {
CoreV1Api coreV1Api = new CoreV1Api(apiClient);
coreV1Api.deleteNamespacedEndpoints(
LOCK_RESOURCE_NAME,
NAMESPACE,
null,
null,
null,
null,
null,
null
);
}
catch (ApiException ex) {
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
} catch (ApiException ex) {
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
Expand All @@ -339,17 +302,8 @@ private void deleteLeaseLockResource() throws Exception {
try {
CoordinationV1Api coordinationV1Api = new CoordinationV1Api(apiClient);
coordinationV1Api.deleteNamespacedLease(
LOCK_RESOURCE_NAME,
NAMESPACE,
null,
null,
null,
null,
null,
null
);
}
catch (ApiException ex) {
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
} catch (ApiException ex) {
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
Expand Down

0 comments on commit c226e6b

Please sign in to comment.