Skip to content

Commit

Permalink
fix #4254: implementing release on cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Aug 8, 2022
1 parent dafd855 commit f4cb990
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fix #4041: adding Quantity.getNumericalAmount with an explanation about bytes and cores.
* Fix #4241: added more context to informer logs with the endpoint path
* Fix #4250: allowing for deserialization of polymorphic unwrapped fields
* Fix #4254: implementing release on cancel for leader election
* Fix #4259: Java Generator's CR should have Lombok's `@EqualsAndHashCode` with `callSuper = true`

#### Dependency Upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public CompletableFuture<?> start() {
CompletableFuture<?> renewFuture = renewWithTimeout();
result.whenComplete((v1, t1) -> renewFuture.cancel(true));
renewFuture.whenComplete((v1, t1) -> {
LeaderElectionRecord current = observedRecord.get();
if (current != null && Objects.equals(current.getHolderIdentity(), leaderElectionConfig.getLock().identity())) {
stopLeading(current);
}
if (t1 != null) {
result.completeExceptionally(t1);
} else {
Expand All @@ -101,15 +105,32 @@ public CompletableFuture<?> start() {
});
result.whenComplete((v, t) -> {
acquireFuture.cancel(true);
LeaderElectionRecord current = observedRecord.get();
// if cancelled we still want to notify of stopping leadership
if (current != null && Objects.equals(current.getHolderIdentity(), leaderElectionConfig.getLock().identity())) {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
});
return result;
}

private void stopLeading(LeaderElectionRecord current) {
try {
if (leaderElectionConfig.isReleaseOnCancel()) {
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
leaderElectionConfig.getLock().identity(),
Duration.ZERO,
current.getAcquireTime(),
current.getRenewTime(),
current.getLeaderTransitions());
newLeaderElectionRecord.setVersion(current.getVersion());

leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
}
} catch (LockException | KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.error("Exception occurred while releasing lock '{}'", lockDescription, e);
} finally {
// called regardless of isReleaseOnCancel
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
}

private CompletableFuture<Void> acquire() {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);
Expand Down Expand Up @@ -171,7 +192,7 @@ private boolean tryAcquireOrRenew() throws LockException {
leaderElectionConfig.getLeaseDuration(),
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0);
oldLeaderElectionRecord.getLeaderTransitions() + (isLeader ? 0 : 1));
newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import static org.mockito.Mockito.when;

class LeaderElectorTest {

final static AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>(null);

@Test
void runShouldAbortAfterRenewDeadlineExpired() throws Exception {
Expand Down Expand Up @@ -107,6 +109,29 @@ void runShouldEndlesslyRun() throws Exception {
executor.awaitTermination(5, TimeUnit.SECONDS);
verify(lec.getLeaderCallbacks(), times(1)).onStopLeading();
}

@Test
void shouldReleaseWhenCanceled() throws Exception {
// Given
final LeaderElectionConfig lec = mockLeaderElectionConfiguration();
final CountDownLatch signal = new CountDownLatch(1);
final Lock mockedLock = lec.getLock();
when(lec.isReleaseOnCancel()).thenReturn(true);
doAnswer(invocation -> {
activeLer.set(invocation.getArgument(1, LeaderElectionRecord.class));
signal.countDown();
return null;
}).when(mockedLock).update(any(), any());

// When
LeaderElector leaderElector = new LeaderElector(mock(NamespacedKubernetesClient.class), lec, CommonThreadPool.get());
CompletableFuture<?> started = leaderElector.start();
assertTrue(signal.await(10, TimeUnit.SECONDS));
started.cancel(true);

// Then
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> activeLer.get().getLeaseDuration().equals(Duration.ZERO));
}

@Test
void isLeaderAndIsLeaderShouldReturnTrue() {
Expand Down Expand Up @@ -225,7 +250,6 @@ void jitterWithNegativeShouldReturnDuration() {
}

private static LeaderElectionConfig mockLeaderElectionConfiguration() throws Exception {
final AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>(null);
final LeaderElectionConfig lec = mock(LeaderElectionConfig.class, Answers.RETURNS_DEEP_STUBS);
when(lec.getLeaseDuration()).thenReturn(Duration.ofSeconds(2L));
when(lec.getRenewDeadline()).thenReturn(Duration.ofSeconds(1L));
Expand Down

0 comments on commit f4cb990

Please sign in to comment.