Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler for notifying that Reconcile for Object that existed before DefaultController is started is completed #2803

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -58,21 +60,25 @@ public class DefaultController implements Controller {

private Duration readyTimeout;
private Duration readyCheckInternal;
private final Runnable initialCatchUpCompleteHandler;

/**
* Instantiates a new Default controller.
*
* @param name the name
* @param reconciler the reconciler
* @param workQueue the work queue
* @param initialCatchUpCompleteHandler the handler
* @param readyFuncs the ready funcs
*/
public DefaultController(
String name,
Reconciler reconciler,
RateLimitingQueue<Request> workQueue,
Supplier<Boolean>... readyFuncs) {
this(name, reconciler, workQueue, null, readyFuncs);
Runnable initialCatchUpCompleteHandler,
Supplier<Boolean>... readyFuncs
) {
this(name, reconciler, workQueue, null, initialCatchUpCompleteHandler, readyFuncs);
}

/**
Expand All @@ -89,13 +95,16 @@ public DefaultController(
Reconciler reconciler,
RateLimitingQueue<Request> workQueue,
CollectorRegistry collectorRegistry,
Supplier<Boolean>... readyFuncs) {
Runnable initialCatchUpCompleteHandler,
Supplier<Boolean>... readyFuncs
) {
this.name = name;
this.reconciler = reconciler;
this.workQueue = workQueue;
this.readyFuncs = readyFuncs;
this.readyTimeout = Duration.ofSeconds(30);
this.readyCheckInternal = Duration.ofSeconds(1);
this.initialCatchUpCompleteHandler = initialCatchUpCompleteHandler;
}

// preFlightCheck checks if the controller is ready for working.
Expand Down Expand Up @@ -143,6 +152,13 @@ public void run() {
return;
}

// fixes initial request set.
Set<Request> initialRequests = new HashSet<>(workQueue.getItemsNeedToBeProcessed());
if (initialRequests.isEmpty() && initialCatchUpCompleteHandler != null) {
// If there is nothing in the work queue, it has already caught up, so the handler will be triggered immediately
initialCatchUpCompleteHandler.run();
}

// spawns worker threads for the controller.
CountDownLatch latch = new CountDownLatch(workerCount);
for (int i = 0; i < this.workerCount; i++) {
Expand All @@ -151,7 +167,7 @@ public void run() {
() -> {
log.debug("Starting controller {} worker {}..", this.name, workerIndex);
try {
this.worker();
this.worker(initialRequests);
} catch (Throwable t) {
log.error("Unexpected controller loop abortion", t);
} finally {
Expand Down Expand Up @@ -180,7 +196,7 @@ public void shutdown() {
workerThreadPool.shutdown();
}

private void worker() {
private void worker(Set<Request> initialRequests) {
// taking tasks from work-queue in a loop
while (!workQueue.isShuttingDown()) {
gaugeWorkQueueLength.labels(name).set(workQueue.length());
Expand Down Expand Up @@ -231,6 +247,23 @@ private void worker() {
}
} finally {
workQueue.done(request);

// Removes a request from the set as processed only if it successfully reconciles
if (!result.isRequeue()) {
synchronized (initialRequests) {
if (initialRequests.remove(request) &&
initialRequests.isEmpty() &&
initialCatchUpCompleteHandler != null) {
try {
// Triggers the handler only once when all requests queued at the start have been successfully processed
initialCatchUpCompleteHandler.run();
} catch (Throwable t) {
log.error("initialCatchUpCompleteHandler failed unexpectedly", t);
}
}
}
}

gaugeWorkQueueLength.labels(name).set(workQueue.length());
log.debug("Controller {} finished reconciling {}..", this.name, request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class DefaultControllerBuilder {
private SharedInformerFactory informerFactory;
private List<Supplier<Boolean>> readyFuncs;
private Reconciler reconciler;
private Runnable initialCatchUpCompleteHandler;

DefaultControllerBuilder() {
this.workerCount = Constants.DEFAULT_WORKER_COUNT;
Expand Down Expand Up @@ -146,6 +147,16 @@ public DefaultControllerBuilder withReconciler(Reconciler reconciler) {
return this;
}

/**
* Set a handler that will be triggered only once when all requests queued at the start have been successfully processed.
* @param initialCatchUpCompleteHandler the handler
* @return the controller builder
*/
public DefaultControllerBuilder withInitialCatchUpCompleteHandler(Runnable initialCatchUpCompleteHandler) {
this.initialCatchUpCompleteHandler = initialCatchUpCompleteHandler;
return this;
}

/**
* Build the controller.
*
Expand All @@ -162,6 +173,7 @@ public Controller build() throws IllegalStateException {
this.controllerName,
this.reconciler,
this.workQueue,
this.initialCatchUpCompleteHandler,
this.readyFuncs.stream().toArray(Supplier[]::new));

if (this.readyTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import io.kubernetes.client.extended.workqueue.ratelimiter.DefaultControllerRateLimiter;
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -50,4 +52,10 @@ public void forget(T item) {
public void addRateLimited(T item) {
super.addAfter(item, rateLimiter.when(item));
}


@Override
public Set<T> getItemsNeedToBeProcessed() {
return getDirtySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public synchronized void shutDown() {
public synchronized boolean isShuttingDown() {
return shuttingDown;
}

public synchronized Set<T> getDirtySet() {
return new HashSet<>(this.dirty);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package io.kubernetes.client.extended.workqueue;

import java.util.Set;

/** RateLimitingQueue defines a queue that rate limits items being added to the queue. */
public interface RateLimitingQueue<T> extends DelayingQueue<T> {

Expand All @@ -38,4 +40,10 @@ public interface RateLimitingQueue<T> extends DelayingQueue<T> {
* @return times the item was requeued
*/
int numRequeues(T item);

/**
* getItemsNeedToBeProcessed returns the set of Items that need to be processed.
* @return items that need to be processed
*/
Set<T> getItemsNeedToBeProcessed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.kubernetes.client.extended.controller;

import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import io.kubernetes.client.extended.controller.reconciler.Reconciler;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void tearDown() throws Exception {}
@Test(timeout = 90000)
public void testStartingStoppingController() throws InterruptedException {

DefaultController testController = new DefaultController("", mockReconciler, workQueue);
DefaultController testController = new DefaultController("", mockReconciler, workQueue, null);

testController.setWorkerCount(1);
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));
Expand Down Expand Up @@ -120,7 +121,7 @@ public Object answer(InvocationOnMock invocation) {
latch.acquire();
AtomicBoolean ready = new AtomicBoolean(false);
DefaultController testController =
new DefaultController("", mockReconciler, workQueue, () -> ready.get());
new DefaultController("", mockReconciler, workQueue, null, (Runnable) null, () -> ready.get());
testController.setWorkerCount(1);
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));
testController.setReadyCheckInternal(Duration.ofMillis(100));
Expand Down Expand Up @@ -164,7 +165,8 @@ public Result reconcile(Request request) {
}
}
},
workQueue);
workQueue,
null);
testController.setWorkerCount(1);
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));

Expand All @@ -184,4 +186,66 @@ public Result reconcile(Request request) {
assertTrue(resumed.get());
assertTrue(finishedRequests.size() >= 1);
}

@Test(timeout = 90000)
public void testControllerTriggersInitialCatchUpCompleteHandlerWhenReconcilerCaughtUp()
throws InterruptedException {
final Semaphore handlerLatch = new Semaphore(1);
Runnable initialCatchUpCompleteHandler = spy(new Runnable() {
@Override
public void run() {
handlerLatch.release();
}
});
handlerLatch.acquire();

final Semaphore latch = new Semaphore(1);
latch.acquire();
Reconciler spyReconciler = spy(new Reconciler() {
@Override
public Result reconcile(Request request) {
try {
return new Result(false);
} finally {
latch.release();
}
}
});
DefaultController testController = new DefaultController(
"", spyReconciler, workQueue, initialCatchUpCompleteHandler);

testController.setWorkerCount(1);
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));

Request request1 = new Request("test1");
workQueue.add(request1);
verify(initialCatchUpCompleteHandler, times(0)).run();

controllerThead.submit(testController::run);
latch.acquire();
verify(spyReconciler, times(1)).reconcile(request1);

handlerLatch.acquire();
verify(initialCatchUpCompleteHandler, times(1)).run();
reset(initialCatchUpCompleteHandler);

Request request2 = new Request("test2");
verify(spyReconciler, times(0)).reconcile(request2);

workQueue.add(request2);
latch.acquire();
verify(spyReconciler, times(1)).reconcile(request2);
// Sleeps to make sure the handler isn't called.
cooldown();
verify(initialCatchUpCompleteHandler, times(0)).run();

testController.shutdown();

// Make sure the handler is called immediately if there is nothing to reconcile
assertEquals(0, workQueue.length());
controllerThead.submit(testController::run);
handlerLatch.acquire();
verify(initialCatchUpCompleteHandler, times(1)).run();
}

}