From 65d3282e666673dc0e557ebf0ce2790a56e533e6 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Sun, 23 Aug 2020 13:08:16 +0100 Subject: [PATCH] Refactor waitUntilCondition() for lists to use common fork-join pool. This should be less resource-intensive than using a new thread for every list item, regardless of the size of the list. Also revert previous change for preserving thread interrupt status, because the thread is exiting anyway. --- .../client/dsl/base/BaseOperation.java | 2 +- ...hDeleteRecreateWaitApplicableListImpl.java | 84 +++++++------------ .../v1beta1/DeploymentRollingUpdater.java | 2 +- .../v1beta1/ReplicaSetRollingUpdater.java | 2 +- .../client/utils/PodOperationUtilTest.java | 2 +- 5 files changed, 35 insertions(+), 57 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java index 728d60aa68..6b1b4eff70 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java @@ -524,7 +524,7 @@ public FilterWatchListDeletable withField(String key, Stri } @Override - public FilterWatchListDeletable> withInvolvedObject(ObjectReference objectReference) { + public FilterWatchListDeletable withInvolvedObject(ObjectReference objectReference) { if (objectReference != null) { if (objectReference.getName() != null) { fields.put(INVOLVED_OBJECT_NAME, objectReference.getName()); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java index 4b89504d22..95f707fb12 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java @@ -38,8 +38,6 @@ import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.Utils; -import java.net.HttpURLConnection; -import java.util.function.Predicate; import okhttp3.OkHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +45,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -54,12 +53,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl extends OperationSupport implements ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, Waitable, HasMetadata>, Readiable { @@ -139,34 +140,27 @@ public List waitUntilCondition(Predicate condition, } final List> futures = new ArrayList<>(items.size()); - final ExecutorService executor = Executors.newFixedThreadPool(items.size()); + for (final HasMetadata meta : items) { + final ResourceHandler h = handlerOf(meta); + futures.add(CompletableFuture.supplyAsync(() -> { + try { + return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); + } catch (Exception e) { + //consider all errors as not ready. + logAsNotReady(e, meta); + return null; + } + })); + } try { - for (final HasMetadata meta : items) { - final ResourceHandler h = handlerOf(meta); - futures.add(CompletableFuture.supplyAsync(() -> { - try { - return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); - } catch (InterruptedException e) { - // Don't forget that this thread was interrupted. - Thread.currentThread().interrupt(); - - //consider all errors as not ready. - logAsNotReady(e, meta); - return null; - } catch (Exception e) { - //consider all errors as not ready. - logAsNotReady(e, meta); - return null; - } - }, executor)); - } - - // Wait for all futures to complete, remembering that every future - // has been given the same timeout value. - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - } finally { - executor.shutdown(); + // All of the individual futures have the same timeout period, + // but they may not all necessarily start executing together. + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(amount, timeUnit); + } catch (TimeoutException | ExecutionException e) { + // We don't allow individual futures to complete with an exception, + // which means we should never catch ExecutionException here. + LOGGER.debug("Global timeout reached", e); } final List results = new ArrayList<>(); @@ -177,13 +171,19 @@ public List waitUntilCondition(Predicate condition, int i = 0; for (final HasMetadata meta : items) { try { - HasMetadata result = futures.get(i).get(); + CompletableFuture future = futures.get(i); + HasMetadata result = future.getNow(null); if (result != null) { results.add(result); } else { + // Cancel this future, just in case it never had + // an opportunity to execute in the first place. + future.cancel(true); itemsWithConditionNotMatched.add(meta); } - } catch (ExecutionException e) { + } catch (CompletionException e) { + // We should never reach here, because individual futures + // aren't allowed to complete with an exception. itemsWithConditionNotMatched.add(meta); logAsNotReady(e.getCause(), meta); } @@ -455,26 +455,4 @@ private static ResourceHandler handlerOf(T item) { throw new IllegalArgumentException("Could not find a registered handler for item: [" + item + "]."); } } - - /** - * Waits until the latch reaches to zero and then checks if the expected result - * @param latch The latch. - * @param expected The expected number. - * @param actual The actual number. - * @param amount The amount of time to wait. - * @param timeUnit The timeUnit. - * @return - */ - private static boolean checkConditionMetForAll(CountDownLatch latch, int expected, AtomicInteger actual, long amount, TimeUnit timeUnit) { - try { - if (latch.await(amount, timeUnit)) { - return actual.intValue() == expected; - } - return false; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java index a1f730e9e0..90cf074ec5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java @@ -58,7 +58,7 @@ protected Deployment createClone(Deployment obj, String newName, String newDeplo @Override protected PodList listSelectedPods(Deployment obj) { - FilterWatchListDeletable> podLister = pods().inNamespace(namespace); + FilterWatchListDeletable podLister = pods().inNamespace(namespace); if (obj.getSpec().getSelector().getMatchLabels() != null) { podLister.withLabels(obj.getSpec().getSelector().getMatchLabels()); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java index 6bdcbd3d9c..82ed5b8637 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java @@ -58,7 +58,7 @@ protected ReplicaSet createClone(ReplicaSet obj, String newName, String newDeplo @Override protected PodList listSelectedPods(ReplicaSet obj) { - FilterWatchListDeletable> podLister = pods().inNamespace(namespace); + FilterWatchListDeletable podLister = pods().inNamespace(namespace); if (obj.getSpec().getSelector().getMatchLabels() != null) { podLister.withLabels(obj.getSpec().getSelector().getMatchLabels()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/PodOperationUtilTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/PodOperationUtilTest.java index 1a98b23bdc..7f231931bc 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/PodOperationUtilTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/PodOperationUtilTest.java @@ -184,7 +184,7 @@ private FilterWatchListDeletable getMockPodFilterO public FilterWatchListDeletable withLabelSelector(LabelSelector selector) { return null; } @Override - public FilterWatchListDeletable> withInvolvedObject(ObjectReference objectReference) { return null; } + public FilterWatchListDeletable withInvolvedObject(ObjectReference objectReference) { return null; } @Override public PodList list() { return getMockPodList(controllerUid); }