Skip to content

Commit

Permalink
fix #2860 #2207 notification of loss of leadership and easy cancel
Browse files Browse the repository at this point in the history
also removing holding a thread when using start
and adding more jitter support
and a cleanup of the retry logic
  • Loading branch information
shawkins authored and manusa committed May 19, 2022
1 parent db6017e commit 042c77b
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 302 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### 6.0-SNAPSHOT

#### Bugs
* Fix #2860: ensure that lockexceptions won't inhibit notification
* Fix #3832 #1883: simplifying the isHttpsAvailable check
* Fix #3745: the client will throw better exceptions when a namespace is not discernible for an operation
* Fix #3990: Throw exception when `HasMetadata` is used in `resources(...)` API
Expand All @@ -16,6 +17,7 @@
* Fix #3889 : remove piped stream for file download
* Fix #1285: removed references to manually calling registerCustomKind
* Fix #3334: adding basic support for server side apply. Use patch(PatchContext.of(PatchType.SERVER_SIDE_APPLY), service), or new PatchContext.Builder().withPatchType(PatchType.SERVER_SIDE_APPLY).withForce(true).build() to override conflicts.
* Fix #2207: added LeaderElector.start to provide a CompletableFuture for easy cancellation.
* Fix #3969: relist will not trigger sync events
* Fix #4082: improving informOnCondition to test the initial list instead of individual add events
* Fix #3968: SharedIndexInformer.initialState can be used to set the store state before the informer starts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package io.fabric8.kubernetes.client.extended.leaderelection;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaderElectionRecord;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.Lock;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,15 +30,13 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class LeaderElector<C extends NamespacedKubernetesClient> {

Expand All @@ -48,83 +48,104 @@ public class LeaderElector<C extends NamespacedKubernetesClient> {
private LeaderElectionConfig leaderElectionConfig;
private final AtomicReference<LeaderElectionRecord> observedRecord;
private final AtomicReference<LocalDateTime> observedTime;
private final AtomicReference<String> reportedLeader;

public LeaderElector(C kubernetesClient, LeaderElectionConfig leaderElectionConfig) {
this.kubernetesClient = kubernetesClient;
this.leaderElectionConfig = leaderElectionConfig;
observedRecord = new AtomicReference<>();
observedTime = new AtomicReference<>();
reportedLeader = new AtomicReference<>();
}

/**
* Starts the leader election loop
* <p>
* {@link #start()} is preferred as it does not hold a thread.
*/
public void run() {
LOGGER.debug("Leader election started");
if (!acquire()) {
return;
CompletableFuture<?> acquire = start();
try {
acquire.get();
} catch (InterruptedException e) {
acquire.cancel(true);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOGGER.error("Exception during leader election", e);
}
leaderElectionConfig.getLeaderCallbacks().onStartLeading();
renewWithTimeout();
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}

private boolean acquire() {
/**
* Start a leader elector. The future may be cancelled to stop
* the leader elector.
*
* @return the future
*/
public CompletableFuture<?> start() {
LOGGER.debug("Leader election started");
CompletableFuture<Void> result = new CompletableFuture<>();

CompletableFuture<?> acquireFuture = acquire();

acquireFuture.whenComplete((v, t) -> {
if (t == null) {
CompletableFuture<?> renewFuture = renewWithTimeout();
result.whenComplete((v1, t1) -> renewFuture.cancel(true));
renewFuture.whenComplete((v1, t1) -> {
if (t1 != null) {
result.completeExceptionally(t1);
} else {
result.complete(null);
}
});
}
});
result.whenComplete((v, t) -> {
acquireFuture.cancel(true);
LeaderElectionRecord current = observedRecord.get();
// if cancelled we still want to notify of stopping leadership
if (current != null && Objects.equals(current.getHolderIdentity(), leaderElectionConfig.getLock().identity())) {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
});
return result;
}

private CompletableFuture<Void> acquire() {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);
final AtomicBoolean succeeded = new AtomicBoolean(false);
return loop(countDownLatch -> {
return loop(completion -> {
try {
if (!succeeded.get()) {
succeeded.set(tryAcquireOrRenew());
reportTransitionIfLeaderChanged();
}
if (succeeded.get()) {
LOGGER.debug("Successfully Acquired leader lease '{}'", lockDescription);
countDownLatch.countDown();
} else {
LOGGER.debug("Failed to acquire lease '{}' retrying...", lockDescription);
if (tryAcquireOrRenew()) {
completion.complete(null);
}
} catch (Exception exception) {
LOGGER.debug("Failed to acquire lease '{}' retrying...", lockDescription);
} catch (LockException | KubernetesClientException exception) {
LOGGER.error("Exception occurred while acquiring lock '{}'", lockDescription, exception);
}
}, jitter(leaderElectionConfig.getRetryPeriod(), JITTER_FACTOR).toMillis());
}, () -> jitter(leaderElectionConfig.getRetryPeriod(), JITTER_FACTOR).toMillis());
}

private void renewWithTimeout() {
private CompletableFuture<Void> renewWithTimeout() {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.debug("Attempting to renew leader lease '{}'...", lockDescription);
loop(abortLatch -> {
final ExecutorService timeoutExecutorService = Executors.newSingleThreadScheduledExecutor();
final CountDownLatch renewSignal = new CountDownLatch(1);
try {
timeoutExecutorService.submit(() -> renew(abortLatch, renewSignal));
if (!renewSignal.await(leaderElectionConfig.getRenewDeadline().toMillis(), TimeUnit.MILLISECONDS)) {
LOGGER.debug("Renew deadline reached after {} seconds while renewing lock {}",
AtomicLong renewBy = new AtomicLong(System.currentTimeMillis() + leaderElectionConfig.getRenewDeadline().toMillis());
return loop(completion -> {
if (System.currentTimeMillis() > renewBy.get()) {
LOGGER.debug("Renew deadline reached after {} seconds while renewing lock {}",
leaderElectionConfig.getRenewDeadline().get(ChronoUnit.SECONDS), lockDescription);
abortLatch.countDown();
}
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
timeoutExecutorService.shutdown();
completion.complete(null);
return;
}
}, leaderElectionConfig.getRetryPeriod().toMillis());
}

private void renew(CountDownLatch abortLatch, CountDownLatch renewSignal) {
try {
final boolean success = tryAcquireOrRenew();
reportTransitionIfLeaderChanged();
if (!success) {
abortLatch.countDown();
try {
if (tryAcquireOrRenew()) {
renewBy.set(System.currentTimeMillis() + leaderElectionConfig.getRenewDeadline().toMillis());
} else {
// renewal failed, exit
completion.complete(null);
}
} catch (LockException | KubernetesClientException exception) {
LOGGER.debug("Exception occurred while renewing lock: {}", exception.getMessage(), exception);
}
} catch(LockException exception) {
LOGGER.debug("Exception occurred while renewing lock: {}", exception.getMessage(), exception);
}
renewSignal.countDown();
}, () -> leaderElectionConfig.getRetryPeriod().toMillis());
}

private boolean tryAcquireOrRenew() throws LockException {
Expand All @@ -133,7 +154,7 @@ private boolean tryAcquireOrRenew() throws LockException {
final LeaderElectionRecord oldLeaderElectionRecord = lock.get(kubernetesClient);
if (oldLeaderElectionRecord == null) {
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
lock.identity(), leaderElectionConfig.getLeaseDuration(), now, now, 0);
lock.identity(), leaderElectionConfig.getLeaseDuration(), now, now, 0);
lock.create(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
Expand All @@ -145,32 +166,33 @@ private boolean tryAcquireOrRenew() throws LockException {
return false;
}
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
lock.identity(),
leaderElectionConfig.getLeaseDuration(),
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0
);
lock.identity(),
leaderElectionConfig.getLeaseDuration(),
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0);
newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}

private void updateObserved(LeaderElectionRecord leaderElectionRecord) {
if (!Objects.equals(leaderElectionRecord, observedRecord.get())) {
observedRecord.set(leaderElectionRecord);
final LeaderElectionRecord current = observedRecord.getAndSet(leaderElectionRecord);
if (!Objects.equals(leaderElectionRecord, current)) {
observedTime.set(LocalDateTime.now());
}
}

private void reportTransitionIfLeaderChanged() {
final String currentLeader = reportedLeader.get();
final String newLeader = observedRecord.get().getHolderIdentity();
if (!Objects.equals(newLeader, currentLeader)) {
LOGGER.debug("Leader changed from {} to {}", currentLeader, newLeader);
reportedLeader.set(newLeader);
leaderElectionConfig.getLeaderCallbacks().onNewLeader(newLeader);
final String currentLeader = current == null ? null : current.getHolderIdentity();
final String newLeader = leaderElectionRecord.getHolderIdentity();
if (!Objects.equals(newLeader, currentLeader)) {
LOGGER.debug("Leader changed from {} to {}", currentLeader, newLeader);
leaderElectionConfig.getLeaderCallbacks().onNewLeader(newLeader);
if (Objects.equals(currentLeader, leaderElectionConfig.getLock().identity())) {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
} else if (Objects.equals(newLeader, leaderElectionConfig.getLock().identity())) {
LOGGER.debug("Successfully Acquired leader lease '{}'", leaderElectionConfig.getLock().describe());
leaderElectionConfig.getLeaderCallbacks().onStartLeading();
}
}
}
}

Expand All @@ -183,29 +205,19 @@ protected final boolean canBecomeLeader(LeaderElectionRecord leaderElectionRecor
}

/**
* Periodically (every provided period) runs the provided {@link Consumer} in a separate thread causing the current
* thread to wait until the supplied {@link CountDownLatch} is decremented by 1 unit.
* Periodically (every provided period) runs the provided {@link Consumer} in a separate thread
* until the supplied {@link CompletableFuture} is completed.
*
* @param consumer function to run in a separate thread
* @param periodInMillis to schedule the run of the provided consumer
* @return true if the current thread was not interrupted, false otherwise
* @param delaySupplier to schedule the run of the provided consumer
* @return the future to be completed
*/
protected static boolean loop(Consumer<CountDownLatch> consumer, long periodInMillis) {
final ScheduledExecutorService singleThreadExecutorService = Executors.newSingleThreadScheduledExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final Future<?> future = singleThreadExecutorService.scheduleAtFixedRate(
() -> consumer.accept(countDownLatch), 0L, periodInMillis, TimeUnit.MILLISECONDS);
try {
countDownLatch.await();
return true;
} catch (InterruptedException e) {
LOGGER.debug("Loop thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
return false;
} finally {
future.cancel(true);
singleThreadExecutorService.shutdownNow();
}
protected static CompletableFuture<Void> loop(Consumer<CompletableFuture<?>> consumer, Supplier<Long> delaySupplier) {
CompletableFuture<Void> completion = new CompletableFuture<>();
Utils.scheduleWithVariableRate(completion, Utils.getCommonExecutorSerive(), () -> consumer.accept(completion), 0,
delaySupplier,
TimeUnit.MILLISECONDS);
return completion;
}

protected static ZonedDateTime now() {
Expand All @@ -215,7 +227,7 @@ protected static ZonedDateTime now() {
/**
* Returns a {@link Duration} between the provided duration and (duration + maxFactor*duration)
*
* @param duration to apply jitter to
* @param duration to apply jitter to
* @param maxFactor for jitter
* @return the jittered duration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,21 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class Utils {
Expand Down Expand Up @@ -464,12 +469,53 @@ public static ScheduledFuture<?> schedule(Executor executor, Runnable command, l

/**
* Schedule a repeated task to run in the given {@link Executor} - which should run the task in a different thread as to not
* hold the scheduling thread
* hold the scheduling thread.
* <p>
* Has the same general contract as {@link ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}
*/
public static ScheduledFuture<?> scheduleAtFixedRate(Executor executor, Runnable command, long initialDelay, long delay,
public static CompletableFuture<?> scheduleAtFixedRate(Executor executor, Runnable command, long initialDelay, long delay,
TimeUnit unit) {
// because of the hand-off to the other executor, there's no difference between rate and delay
return SHARED_SCHEDULER.scheduleWithFixedDelay(() -> executor.execute(command), initialDelay, delay, unit);
CompletableFuture<Void> completion = new CompletableFuture<>();
scheduleWithVariableRate(completion, executor, command, initialDelay, () -> delay, unit);
return completion;
}

/**
* Schedule a repeated task to run in the given {@link Executor} - which should run the task in a different thread as to not
* hold the scheduling thread.
* <p>
*
* @param nextDelay provides the relative next delay - that is the values are applied cumulatively to the initial start
* time. Supplying a fixed value produces a fixed rate.
*/
public static void scheduleWithVariableRate(CompletableFuture<?> completion, Executor executor, Runnable command,
long initialDelay,
Supplier<Long> nextDelay, TimeUnit unit) {
AtomicReference<ScheduledFuture<?>> currentScheduledFuture = new AtomicReference<>();
AtomicLong next = new AtomicLong(unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) + Math.max(0, initialDelay));
schedule(() -> CompletableFuture.runAsync(command, executor), initialDelay, unit, completion, nextDelay, next,
currentScheduledFuture);
// remove on cancel is true, so this may proactively clean up
completion.whenComplete((v, t) -> Optional.ofNullable(currentScheduledFuture.get()).ifPresent(s -> s.cancel(true)));
}

private static void schedule(Supplier<CompletableFuture<?>> runner, long delay, TimeUnit unit,
CompletableFuture<?> completion, Supplier<Long> nextDelay, AtomicLong next,
AtomicReference<ScheduledFuture<?>> currentScheduledFuture) {
currentScheduledFuture.set(SHARED_SCHEDULER.schedule(() -> {
if (completion.isDone()) {
return;
}
CompletableFuture<?> runAsync = runner.get();
runAsync.whenComplete((v, t) -> {
if (t != null) {
completion.completeExceptionally(t);
} else if (!completion.isDone()) {
schedule(runner, next.addAndGet(nextDelay.get()) - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS),
unit, completion, nextDelay, next, currentScheduledFuture);
}
});
}, delay, unit));
}

/**
Expand Down

0 comments on commit 042c77b

Please sign in to comment.