From 698f1d03990b908762ce141904ccd37c32dfd02c Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 16:15:33 +0100 Subject: [PATCH 01/10] refactor: provide customizable close implementation --- .../client/dsl/internal/AbstractWatchManager.java | 12 ++++++++++++ .../dsl/internal/RawWatchConnectionManager.java | 8 ++------ .../client/dsl/internal/WatchConnectionManager.java | 8 ++------ .../client/dsl/internal/WatchHTTPManager.java | 7 ------- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index ff73a6d4fce..773263639a6 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -137,4 +137,16 @@ static void closeWebSocket(WebSocket webSocket) { } } } + + @Override + public void close() { + logger.debug("Force closing the watch {}", this); + closeEvent(); + internalClose(); + closeExecutorService(); + } + + protected void internalClose() { + // default implementation does nothing + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 0780810b62b..2ffba7c93d3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -231,13 +231,9 @@ public void execute() { public void waitUntilReady() { Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); } - + @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); + protected void internalClose() { closeWebSocket(webSocketRef.getAndSet(null)); - closeExecutorService(); } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index eb172cd4607..b9237392d9d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -303,13 +303,9 @@ public void execute() { public void waitUntilReady() { Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); } - + @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); + protected void internalClose() { closeWebSocket(webSocketRef.getAndSet(null)); - closeExecutorService(); } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 1a7acb9be38..ba37c12677d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -300,11 +300,4 @@ protected static WatchEvent readWatchEvent(String messageSource) { } return event; } - - @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); - closeExecutorService(); - } } From 4348f9df987f018ccaa144c9b1be9e4812662a59 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 17:47:54 +0100 Subject: [PATCH 02/10] refactor: extract RequestBuilder interface to reuse more code --- .../dsl/internal/AbstractWatchManager.java | 14 ++- .../internal/RawWatchConnectionManager.java | 45 +++++--- .../dsl/internal/WatchConnectionManager.java | 100 ++++++++++-------- .../client/dsl/internal/WatchHTTPManager.java | 50 +-------- .../internal/AbstractWatchManagerTest.java | 3 +- 5 files changed, 100 insertions(+), 112 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 773263639a6..4c777e74026 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import okhttp3.OkHttpClient; +import okhttp3.Request; import okhttp3.WebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,6 @@ public abstract class AbstractWatchManager implements Watch { private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class); final Watcher watcher; - final ListOptions listOptions; final AtomicReference resourceVersion; final OkHttpClient clonedClient; @@ -47,14 +47,15 @@ public abstract class AbstractWatchManager implements Watch { private final int maxIntervalExponent; final AtomicInteger currentReconnectAttempt; private final ScheduledExecutorService executorService; + + protected final RequestBuilder requestBuilder; AbstractWatchManager( Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, - OkHttpClient clonedClient + OkHttpClient clonedClient, RequestBuilder requestBuilder ) { this.watcher = watcher; - this.listOptions = listOptions; this.reconnectLimit = reconnectLimit; this.reconnectInterval = reconnectInterval; this.maxIntervalExponent = maxIntervalExponent; @@ -67,6 +68,8 @@ public abstract class AbstractWatchManager implements Watch { ret.setDaemon(true); return ret; }); + + this.requestBuilder = requestBuilder; } final void closeEvent(WatcherException cause) { @@ -149,4 +152,9 @@ public void close() { protected void internalClose() { // default implementation does nothing } + + @FunctionalInterface + interface RequestBuilder { + Request build(final String resourceVersion); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 2ffba7c93d3..82bf96b9141 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -52,7 +52,6 @@ public class RawWatchConnectionManager extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); private ObjectMapper objectMapper; - private HttpUrl.Builder watchUrlBuilder; private final AtomicReference webSocketRef = new AtomicReference<>(); /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ @@ -64,30 +63,42 @@ public class RawWatchConnectionManager extends AbstractWatchManager { public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { super( watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - okHttpClient.newBuilder().build() + okHttpClient.newBuilder().build(), new RawRequestBuilder(watchUrlBuilder) ); - this.watchUrlBuilder = watchUrlBuilder; this.objectMapper = objectMapper; runWatch(); } - - private void runWatch() { - if (resourceVersion.get() != null) { - watchUrlBuilder.removeAllQueryParameters("resourceVersion"); - watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion.get()); + + static class RawRequestBuilder implements RequestBuilder { + private final HttpUrl.Builder watchUrlBuilder; + + public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) { + this.watchUrlBuilder = watchUrlBuilder; } - HttpUrl watchUrl = watchUrlBuilder.build(); - String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); - if (watchUrl.url().getPort() != -1) { - origin += ":" + watchUrl.url().getPort(); + + @Override + public Request build(String resourceVersion) { + if (resourceVersion != null) { + watchUrlBuilder.removeAllQueryParameters("resourceVersion"); + watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion); + } + HttpUrl watchUrl = watchUrlBuilder.build(); + String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); + if (watchUrl.url().getPort() != -1) { + origin += ":" + watchUrl.url().getPort(); + } + + return new Request.Builder() + .get() + .url(watchUrl) + .addHeader("Origin", origin) + .build(); } + } - Request request = new Request.Builder() - .get() - .url(watchUrl) - .addHeader("Origin", origin) - .build(); + private void runWatch() { + Request request = requestBuilder.build(resourceVersion.get()); clonedClient.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index b9237392d9d..b34803075a1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -54,29 +54,21 @@ public class WatchConnectionManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); - - private final BaseOperation baseOperation; + private final AtomicReference webSocketRef = new AtomicReference<>(); /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean reconnectPending = new AtomicBoolean(false); /** Blocking queue for startup exceptions. */ private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); - private final URL requestUrl; public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { super( watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, client.newBuilder() .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) - .build() + .build(), new BaseOperationRequestBuilder(baseOperation, listOptions) ); - this.baseOperation = baseOperation; - - // The URL is created, validated and saved once, so that reconnect attempts don't have to deal with - // MalformedURLExceptions that would never occur - - requestUrl = baseOperation.getNamespacedUrl(); runWatch(); } @@ -86,42 +78,9 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation 0) { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; - } - if (Utils.isNotNullOrEmpty(fieldQueryString)) { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } - - listOptions.setResourceVersion(resourceVersion.get()); - HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); - - String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); - if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); - } - - Request request = new Request.Builder() - .get() - .url(httpUrlBuilder.build()) - .addHeader("Origin", origin) - .build(); - + final Request request = requestBuilder.build(resourceVersion.get()); + logger.debug("Watching {}...", request.url()); + clonedClient.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(final WebSocket webSocket, Response response) { @@ -308,4 +267,53 @@ public void waitUntilReady() { protected void internalClose() { closeWebSocket(webSocketRef.getAndSet(null)); } + + static class BaseOperationRequestBuilder implements RequestBuilder { + private final URL requestUrl; + private final BaseOperation baseOperation; + private final ListOptions listOptions; + + public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { + this.baseOperation = baseOperation; + this.requestUrl = baseOperation.getNamespacedUrl(); + this.listOptions = listOptions; + } + + @Override + public Request build(final String resourceVersion) { + HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); + + String labelQueryParam = baseOperation.getLabelQueryParam(); + if (Utils.isNotNullOrEmpty(labelQueryParam)) { + httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); + } + + String fieldQueryString = baseOperation.getFieldQueryParam(); + String name = baseOperation.getName(); + + if (name != null && name.length() > 0) { + if (fieldQueryString.length() > 0) { + fieldQueryString += ","; + } + fieldQueryString += "metadata.name=" + name; + } + if (Utils.isNotNullOrEmpty(fieldQueryString)) { + httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); + } + + listOptions.setResourceVersion(resourceVersion); + HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); + + String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); + if (requestUrl.getPort() != -1) { + origin += ":" + requestUrl.getPort(); + } + + return new Request.Builder() + .get() + .url(httpUrlBuilder.build()) + .addHeader("Origin", origin) + .build(); + } + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index ba37c12677d..2ecaf7f76d1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -57,11 +57,8 @@ public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); - private final BaseOperation baseOperation; - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - private final URL requestUrl; - + public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, @@ -83,11 +80,9 @@ public WatchHTTPManager(final OkHttpClient client, .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .readTimeout(0, TimeUnit.MILLISECONDS) .cache(null) - .build() + .build(), new WatchConnectionManager.BaseOperationRequestBuilder(baseOperation, listOptions) ); - this.baseOperation = baseOperation; - - + // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does // not let us stream responses from the server. for (Interceptor i : clonedClient.networkInterceptors()) { @@ -97,47 +92,12 @@ public WatchHTTPManager(final OkHttpClient client, } } - requestUrl = baseOperation.getNamespacedUrl(); runWatch(); } private void runWatch() { - HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); - String labelQueryParam = baseOperation.getLabelQueryParam(); - if (Utils.isNotNullOrEmpty(labelQueryParam)) { - httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); - } - - String fieldQueryString = baseOperation.getFieldQueryParam(); - String name = baseOperation.getName(); - - if (name != null && name.length() > 0) { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; - } - - if (Utils.isNotNullOrEmpty(fieldQueryString)) { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } - - listOptions.setResourceVersion(resourceVersion.get()); - HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); - String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); - if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); - } - - HttpUrl url = httpUrlBuilder.build(); - - logger.debug("Watching via HTTP GET {}", url); - - final Request request = new Request.Builder() - .get() - .url(url) - .addHeader("Origin", origin) - .build(); + final Request request = requestBuilder.build(resourceVersion.get()); + logger.debug("Watching {}...", request.url()); clonedClient.newCall(request).enqueue(new Callback() { @Override diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index 023d8d5ff09..cfa29824c63 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import okhttp3.OkHttpClient; +import okhttp3.Request; import okhttp3.WebSocket; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -212,7 +213,7 @@ public void onClose() { private static final class WatchManager extends AbstractWatchManager { public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { - super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, clonedClient); + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, clonedClient, resourceVersion -> null); } @Override public void close() { From 1b4fb99f8565d860c9fd05c92008cbfd74678299 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 22:22:55 +0100 Subject: [PATCH 03/10] refactor: introduce ClientRunner concept to share more code --- .../dsl/internal/AbstractWatchManager.java | 63 +++- .../internal/BaseOperationRequestBuilder.java | 74 ++++ .../dsl/internal/RawRequestBuilder.java | 46 +++ .../internal/RawWatchConnectionManager.java | 228 ++----------- .../dsl/internal/WatchConnectionManager.java | 322 ++++-------------- .../client/dsl/internal/WatchHTTPManager.java | 317 +++++++++-------- .../internal/WatcherWebSocketListener.java | 183 ++++++++++ .../dsl/internal/WebSocketClientRunner.java | 52 +++ 8 files changed, 675 insertions(+), 610 deletions(-) create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 4c777e74026..45621043f52 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -48,7 +48,8 @@ public abstract class AbstractWatchManager implements Watch { final AtomicInteger currentReconnectAttempt; private final ScheduledExecutorService executorService; - protected final RequestBuilder requestBuilder; + private final RequestBuilder requestBuilder; + protected ClientRunner runner; AbstractWatchManager( @@ -71,6 +72,13 @@ public abstract class AbstractWatchManager implements Watch { this.requestBuilder = requestBuilder; } + + protected void initRunner(ClientRunner runner) { + if (this.runner != null) { + throw new IllegalStateException("ClientRunner has already been initialized"); + } + this.runner = runner; + } final void closeEvent(WatcherException cause) { if (forceClosed.getAndSet(true)) { @@ -127,6 +135,37 @@ final long nextReconnectInterval() { logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); return ret; } + + void resetReconnectAttempts() { + currentReconnectAttempt.set(0); + } + + boolean isForceClosed() { + return forceClosed.get(); + } + + void eventReceived(Watcher.Action action, T resource) { + watcher.eventReceived(action, resource); + } + + void onClose(WatcherException cause) { + watcher.onClose(cause); + } + + void updateResourceVersion(final String newResourceVersion) { + resourceVersion.set(newResourceVersion); + } + + protected void runWatch() { + final Request request = requestBuilder.build(resourceVersion.get()); + logger.debug("Watching {}...", request.url()); + + runner.run(request); + } + + public void waitUntilReady() { + runner.waitUntilReady(); + } static void closeWebSocket(WebSocket webSocket) { if (webSocket != null) { @@ -145,16 +184,28 @@ static void closeWebSocket(WebSocket webSocket) { public void close() { logger.debug("Force closing the watch {}", this); closeEvent(); - internalClose(); + runner.close(); closeExecutorService(); } - protected void internalClose() { - // default implementation does nothing - } - @FunctionalInterface interface RequestBuilder { Request build(final String resourceVersion); } + + static abstract class ClientRunner { + private final OkHttpClient client; + + public ClientRunner(OkHttpClient client) { + this.client = cloneAndCustomize(client); + } + + abstract void run(Request request); + void close() {} + abstract void waitUntilReady(); + abstract OkHttpClient cloneAndCustomize(OkHttpClient client); + OkHttpClient client() { + return client; + } + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java new file mode 100644 index 00000000000..9cc7f9d0944 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.net.MalformedURLException; +import java.net.URL; + +import io.fabric8.kubernetes.api.model.ListOptions; +import io.fabric8.kubernetes.client.dsl.base.BaseOperation; +import io.fabric8.kubernetes.client.utils.HttpClientUtils; +import io.fabric8.kubernetes.client.utils.Utils; +import okhttp3.HttpUrl; +import okhttp3.Request; + +class BaseOperationRequestBuilder implements AbstractWatchManager.RequestBuilder { + private final URL requestUrl; + private final BaseOperation baseOperation; + private final ListOptions listOptions; + + public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { + this.baseOperation = baseOperation; + this.requestUrl = baseOperation.getNamespacedUrl(); + this.listOptions = listOptions; + } + + @Override + public Request build(final String resourceVersion) { + HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); + + String labelQueryParam = baseOperation.getLabelQueryParam(); + if (Utils.isNotNullOrEmpty(labelQueryParam)) { + httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); + } + + String fieldQueryString = baseOperation.getFieldQueryParam(); + String name = baseOperation.getName(); + + if (name != null && name.length() > 0) { + if (fieldQueryString.length() > 0) { + fieldQueryString += ","; + } + fieldQueryString += "metadata.name=" + name; + } + if (Utils.isNotNullOrEmpty(fieldQueryString)) { + httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); + } + + listOptions.setResourceVersion(resourceVersion); + HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); + + String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); + if (requestUrl.getPort() != -1) { + origin += ":" + requestUrl.getPort(); + } + + return new Request.Builder() + .get() + .url(httpUrlBuilder.build()) + .addHeader("Origin", origin) + .build(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java new file mode 100644 index 00000000000..5434865aded --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import okhttp3.HttpUrl; +import okhttp3.Request; + +class RawRequestBuilder implements AbstractWatchManager.RequestBuilder { + private final HttpUrl.Builder watchUrlBuilder; + + public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) { + this.watchUrlBuilder = watchUrlBuilder; + } + + @Override + public Request build(String resourceVersion) { + if (resourceVersion != null) { + watchUrlBuilder.removeAllQueryParameters("resourceVersion"); + watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion); + } + HttpUrl watchUrl = watchUrlBuilder.build(); + String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); + if (watchUrl.url().getPort() != -1) { + origin += ":" + watchUrl.url().getPort(); + } + + return new Request.Builder() + .get() + .url(watchUrl) + .addHeader("Origin", origin) + .build(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 82bf96b9141..83f4fd9219d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -17,33 +17,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; -import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import static java.net.HttpURLConnection.HTTP_OK; - /** * This class just replicates WatchConnectionManager in handling watch connections but * instead of using a solid type for deserializing events, it uses plain strings. @@ -51,200 +37,48 @@ */ public class RawWatchConnectionManager extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); - private ObjectMapper objectMapper; - - private final AtomicReference webSocketRef = new AtomicReference<>(); - /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - /** Blocking queue for startup exceptions. */ - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { super( watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - okHttpClient.newBuilder().build(), new RawRequestBuilder(watchUrlBuilder) - ); - this.objectMapper = objectMapper; - + okHttpClient.newBuilder().build(), + new RawRequestBuilder(watchUrlBuilder)); + + initRunner(new WebSocketClientRunner(okHttpClient) { + @Override + WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { + return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, queue, webSocketRef, objectMapper); + } + + @Override + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return okHttpClient.newBuilder().build(); + } + }); runWatch(); } - static class RawRequestBuilder implements RequestBuilder { - private final HttpUrl.Builder watchUrlBuilder; + private static class RawWatcherWebSocketListener extends WatcherWebSocketListener { + private final ObjectMapper objectMapper; - public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) { - this.watchUrlBuilder = watchUrlBuilder; + public RawWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef, ObjectMapper objectMapper) { + super(manager, queue, webSocketRef); + this.objectMapper = objectMapper; } @Override - public Request build(String resourceVersion) { - if (resourceVersion != null) { - watchUrlBuilder.removeAllQueryParameters("resourceVersion"); - watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion); + public void onMessage(WebSocket webSocket, String text) { + try { + Map watchEvent = objectMapper.readValue(text, HashMap.class); + + String watchEventType = watchEvent.get("type").toString(); + String watchObjectAsString = objectMapper.writeValueAsString(watchEvent.get("object")); + + manager.eventReceived(Watcher.Action.valueOf(watchEventType), watchObjectAsString); + + } catch (IOException exception) { + logger.error("Failed to deserialize watch response: " + exception.getMessage()); } - HttpUrl watchUrl = watchUrlBuilder.build(); - String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); - if (watchUrl.url().getPort() != -1) { - origin += ":" + watchUrl.url().getPort(); - } - - return new Request.Builder() - .get() - .url(watchUrl) - .addHeader("Origin", origin) - .build(); } } - - private void runWatch() { - Request request = requestBuilder.build(resourceVersion.get()); - clonedClient.newWebSocket(request, new WebSocketListener() { - @Override - public void onOpen(WebSocket webSocket, Response response) { - logger.info("Websocket opened"); - webSocketRef.set(webSocket); - currentReconnectAttempt.set(0); - started.set(true); - queue.clear(); - queue.add(true); - } - - @Override - public void onMessage(WebSocket webSocket, String text) { - try { - Map watchEvent = objectMapper.readValue(text, HashMap.class); - - String watchEventType = watchEvent.get("type").toString(); - String watchObjectAsString = objectMapper.writeValueAsString(watchEvent.get("object")); - - watcher.eventReceived(Watcher.Action.valueOf(watchEventType), watchObjectAsString); - - } catch (IOException exception) { - logger.error("Failed to deserialize watch response: " + exception.getMessage()); - } - } - - @Override - public void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()); - } - - @Override - public void onClosing(WebSocket webSocket, int code, String reason) { - logger.info("Socket closing: " + reason); - webSocket.close(code, reason); - } - - @Override - public void onClosed(WebSocket webSocket, int code, String reason) { - logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); - if (forceClosed.get()) { - logger.debug("Ignoring onClose for already closed/closing websocket"); - return; - } - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection unexpectedly closed")); - return; - } - scheduleReconnect(); - } - - @Override - public void onFailure(WebSocket webSocket, Throwable t, Response response) { - if (forceClosed.get()) { - logger.debug("Ignoring onFailure for already closed/closing websocket", t); - // avoid resource leak though - if (response != null && response.body() != null) { - response.body().close(); - } - return; - } - - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - if (response != null && response.code() == HTTP_OK) { - queue.clear(); - queue.offer(new KubernetesClientException("Received 200 on websocket", - response.code(), null)); - response.body().close(); - return; - } - - if (response != null) { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); - if (response.body() != null) { - response.body().close(); - } - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), - t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException(status)); - } - } else { - logger.warn("Exec Failure", t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException("Failed to start websocket", t)); - } - } - - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection failure", t)); - return; - } - - scheduleReconnect(); - } - }); - } - - private void scheduleReconnect() { - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - close(); - } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } - } - }); - } - - public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); - } - - @Override - protected void internalClose() { - closeWebSocket(webSocketRef.getAndSet(null)); - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index b34803075a1..6d5aaed7b5e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -15,6 +15,12 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import java.net.MalformedURLException; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; @@ -25,43 +31,16 @@ import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; -import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.HttpClientUtils; -import io.fabric8.kubernetes.client.utils.Utils; -import okhttp3.HttpUrl; import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import static io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager.readWatchEvent; import static java.net.HttpURLConnection.HTTP_GONE; -import static java.net.HttpURLConnection.HTTP_OK; public class WatchConnectionManager> extends AbstractWatchManager { - - private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); - private final AtomicReference webSocketRef = new AtomicReference<>(); - /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - /** Blocking queue for startup exceptions. */ - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); - public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { super( watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, @@ -69,251 +48,80 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { - // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5); - } - - private void runWatch() { - final Request request = requestBuilder.build(resourceVersion.get()); - logger.debug("Watching {}...", request.url()); - - clonedClient.newWebSocket(request, new WebSocketListener() { + + initRunner(new WebSocketClientRunner(client) { @Override - public void onOpen(final WebSocket webSocket, Response response) { - if (response != null && response.body() != null) { - response.body().close(); - } - logger.debug("WebSocket successfully opened"); - webSocketRef.set(webSocket); - currentReconnectAttempt.set(0); - started.set(true); - queue.clear(); - queue.add(true); + WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { + return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef); } - - @Override - public void onFailure(WebSocket webSocket, Throwable t, Response response) { - if (forceClosed.get()) { - logger.debug("Ignoring onFailure for already closed/closing websocket", t); - // avoid resource leak though - if (response != null && response.body() != null) { - response.body().close(); - } - return; - } - - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported - if (response != null && (response.code() == HTTP_OK || response.code() == 503)) { - queue.clear(); - queue.offer(new KubernetesClientException("Received " + response.code() + " on websocket", - response.code(), null)); - response.body().close(); - return; - } - - if (response != null) { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); - if (response.body() != null) { - response.body().close(); - } - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), - t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException(status)); - } - } else { - logger.warn("Exec Failure", t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException("Failed to start websocket", t)); - } - } - - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection failure", t)); - return; - } - - scheduleReconnect(); - } - - @Override - public void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()); - } - - @Override - public void onMessage(WebSocket webSocket, String message) { - try { - WatchEvent event = readWatchEvent(message); - Object object = event.getObject(); - if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - resourceVersion.set(obj.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - watcher.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - KubernetesResourceList list = (KubernetesResourceList) object; - // Dirty cast - should always be valid though - resourceVersion.set(list.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - watcher.eventReceived(action, (T) item); - } - } - } else if (object instanceof Status) { - Status status = (Status) object; - - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe - // exception - // shut down executor, etc. - closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - close(); - return; - } - - watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); - } else { - logger.error("Unknown message received: {}", message); - } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); - } catch (Throwable e) { - logger.error("Unhandled exception encountered in watcher event handler", e); - } - } - - @Override - public void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason); - } - - @Override - public void onClosed(WebSocket webSocket, int code, String reason) { - logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); - if (forceClosed.get()) { - logger.debug("Ignoring onClose for already closed/closing websocket"); - return; - } - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection unexpectedly closed")); - return; - } - scheduleReconnect(); - } - }); - } - - private void scheduleReconnect() { - - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - submit(new NamedRunnable("scheduleReconnect") { + @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - close(); - } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return client.newBuilder() + .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) + .build(); } }); - } - - public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); + runWatch(); } - @Override - protected void internalClose() { - closeWebSocket(webSocketRef.getAndSet(null)); + public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { + // Default max 32x slowdown from base interval + this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5); } - static class BaseOperationRequestBuilder implements RequestBuilder { - private final URL requestUrl; - private final BaseOperation baseOperation; - private final ListOptions listOptions; - - public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { - this.baseOperation = baseOperation; - this.requestUrl = baseOperation.getNamespacedUrl(); - this.listOptions = listOptions; + private static class TypedWatcherWebSocketListener extends WatcherWebSocketListener { + public TypedWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + super(manager, queue, webSocketRef); } @Override - public Request build(final String resourceVersion) { - HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); - - String labelQueryParam = baseOperation.getLabelQueryParam(); - if (Utils.isNotNullOrEmpty(labelQueryParam)) { - httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); - } - - String fieldQueryString = baseOperation.getFieldQueryParam(); - String name = baseOperation.getName(); + public void onMessage(WebSocket webSocket, String message) { + try { + WatchEvent event = readWatchEvent(message); + Object object = event.getObject(); + if (object instanceof HasMetadata) { + @SuppressWarnings("unchecked") + T obj = (T) object; + manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + manager.eventReceived(action, obj); + } else if (object instanceof KubernetesResourceList) { + // Dirty cast - should always be valid though + KubernetesResourceList list = (KubernetesResourceList) object; + manager.updateResourceVersion(list.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + manager.eventReceived(action, (T) item); + } + } + } else if (object instanceof Status) { + Status status = (Status) object; + + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe + // exception + // shut down executor, etc. + manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + manager.close(); + return; + } - if (name != null && name.length() > 0) { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; + manager.eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status); + } else { + logger.error("Unknown message received: {}", message); } - fieldQueryString += "metadata.name=" + name; + } catch (ClassCastException e) { + logger.error("Received wrong type of object for watch", e); + } catch (IllegalArgumentException e) { + logger.error("Invalid event type", e); + } catch (Throwable e) { + logger.error("Unhandled exception encountered in watcher event handler", e); } - if (Utils.isNotNullOrEmpty(fieldQueryString)) { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } - - listOptions.setResourceVersion(resourceVersion); - HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); - - String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); - if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); - } - - return new Request.Builder() - .get() - .url(httpUrlBuilder.build()) - .addHeader("Origin", origin) - .build(); } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 2ecaf7f76d1..3cf9fc6157e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,19 +16,13 @@ package io.fabric8.kubernetes.client.dsl.internal; -import static java.net.HttpURLConnection.HTTP_GONE; - import java.io.IOException; import java.net.MalformedURLException; -import java.net.URL; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; import io.fabric8.kubernetes.api.model.KubernetesResourceList; @@ -41,23 +35,23 @@ import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.HttpClientUtils; import io.fabric8.kubernetes.client.utils.Serialization; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Call; import okhttp3.Callback; -import okhttp3.HttpUrl; import okhttp3.Interceptor; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.logging.HttpLoggingInterceptor; import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.net.HttpURLConnection.HTTP_GONE; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); - - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, @@ -67,176 +61,199 @@ public WatchHTTPManager(final OkHttpClient client, // Default max 32x slowdown from base interval this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, connectTimeout, 5); } - + public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long connectTimeout, int maxIntervalExponent) throws MalformedURLException { - + super( watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, client.newBuilder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .readTimeout(0, TimeUnit.MILLISECONDS) .cache(null) - .build(), new WatchConnectionManager.BaseOperationRequestBuilder(baseOperation, listOptions) + .build(), new BaseOperationRequestBuilder(baseOperation, listOptions) ); - // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does - // not let us stream responses from the server. - for (Interceptor i : clonedClient.networkInterceptors()) { - if (i instanceof HttpLoggingInterceptor) { - HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; - interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); - } - } - - runWatch(); - } - - private void runWatch() { - final Request request = requestBuilder.build(resourceVersion.get()); - logger.debug("Watching {}...", request.url()); - - clonedClient.newCall(request).enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - logger.info("Watch connection failed. reason: {}", e.getMessage()); - scheduleReconnect(true); - } - + initRunner(new HTTPClientRunner(client, this) { @Override - public void onResponse(Call call, Response response) throws IOException { - if (!response.isSuccessful()) { - onStatus(OperationSupport.createStatus(response.code(), response.message())); - } - - boolean shouldBackoff = true; - - try { - BufferedSource source = response.body().source(); - while (!source.exhausted()) { - String message = source.readUtf8LineStrict(); - onMessage(message); + OkHttpClient cloneAndCustomize(OkHttpClient client) { + final OkHttpClient clonedClient = client.newBuilder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .readTimeout(0, TimeUnit.MILLISECONDS) + .cache(null) + .build(); + // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does + // not let us stream responses from the server. + for (Interceptor i : clonedClient.networkInterceptors()) { + if (i instanceof HttpLoggingInterceptor) { + HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; + interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); } - // the normal operation of a long poll get is to return once a response is available. - // in that case we should reconnect immediately. - shouldBackoff = false; - } catch (Exception e) { - logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); - } - - // if we get here, the source is exhausted, so, we have lost our "watch". - // we must reconnect. - if (response != null) { - response.body().close(); } - - scheduleReconnect(shouldBackoff); + return clonedClient; } }); + + runWatch(); } - - private void scheduleReconnect(boolean shouldBackoff) { - if (forceClosed.get()) { - logger.warn("Ignoring error for already closed/closing connection"); - return; + + private abstract static class HTTPClientRunner extends AbstractWatchManager.ClientRunner { + private final AbstractWatchManager manager; + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + + public HTTPClientRunner(OkHttpClient client, AbstractWatchManager manager) { + super(client); + this.manager = manager; } - - if (cannotReconnect()) { - watcher.onClose(new WatcherException("Connection unexpectedly closed")); - return; + + @Override + void run(Request request) { + client().newCall(request).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + logger.info("Watch connection failed. reason: {}", e.getMessage()); + scheduleReconnect(true); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + if (!response.isSuccessful()) { + onStatus(OperationSupport.createStatus(response.code(), response.message())); + } + + boolean shouldBackoff = true; + + try { + BufferedSource source = response.body().source(); + while (!source.exhausted()) { + String message = source.readUtf8LineStrict(); + onMessage(message); + } + // the normal operation of a long poll get is to return once a response is available. + // in that case we should reconnect immediately. + shouldBackoff = false; + } catch (Exception e) { + logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); + } + + // if we get here, the source is exhausted, so, we have lost our "watch". + // we must reconnect. + if (response != null) { + response.body().close(); + } + + scheduleReconnect(shouldBackoff); + } + }); } - - logger.debug("Submitting reconnect task to the executor"); - - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - submit(() -> { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); + + + @Override + void waitUntilReady() {} + + private void scheduleReconnect(boolean shouldBackoff) { + if (manager.isForceClosed()) { + logger.warn("Ignoring error for already closed/closing connection"); return; } - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - - long delay = shouldBackoff - ? nextReconnectInterval() - : 0; - - schedule(() -> { - try { - WatchHTTPManager.this.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. + + if (manager.cannotReconnect()) { + manager.onClose(new WatcherException("Connection unexpectedly closed")); + return; + } + + logger.debug("Submitting reconnect task to the executor"); + + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor. + manager.submit(() -> { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + + long delay = shouldBackoff + ? manager.nextReconnectInterval() + : 0; + + manager.schedule(() -> { + try { + manager.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(); + manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); + } + }, delay, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // This is a standard exception if we close the scheduler. We should not print it + if (!manager.isForceClosed()) { logger.error("Exception in reconnect", e); - close(); - watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - }, delay, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - // This is a standard exception if we close the scheduler. We should not print it - if (!forceClosed.get()) { - logger.error("Exception in reconnect", e); + reconnectPending.set(false); } - reconnectPending.set(false); - } - }); - } - - public void onMessage(String messageSource) { - try { - WatchEvent event = readWatchEvent(messageSource); - KubernetesResource object = event.getObject(); - if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - // Dirty cast - should always be valid though - resourceVersion.set(obj.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - watcher.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - KubernetesResourceList list = (KubernetesResourceList) object; - // Dirty cast - should always be valid though - resourceVersion.set(list.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - watcher.eventReceived(action, (T) item); + }); + } + + public void onMessage(String messageSource) { + try { + WatchEvent event = readWatchEvent(messageSource); + KubernetesResource object = event.getObject(); + if (object instanceof HasMetadata) { + // Dirty cast - should always be valid though + @SuppressWarnings("unchecked") + T obj = (T) object; + manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); + Watcher.Action action = Watcher.Action.valueOf(event.getType()); + manager.eventReceived(action, obj); + } else if (object instanceof KubernetesResourceList) { + KubernetesResourceList list = (KubernetesResourceList) object; + // Dirty cast - should always be valid though + manager.updateResourceVersion(list.getMetadata().getResourceVersion()); + Watcher.Action action = Watcher.Action.valueOf(event.getType()); + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + manager.eventReceived(action, (T) item); + } } + } else if (object instanceof Status) { + onStatus((Status) object); + } else { + logger.error("Unknown message received: {}", messageSource); } - } else if (object instanceof Status) { - onStatus((Status) object); - } else { - logger.error("Unknown message received: {}", messageSource); + } catch (ClassCastException e) { + logger.error("Received wrong type of object for watch", e); + } catch (IllegalArgumentException e) { + logger.error("Invalid event type", e); } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); } - } - - private void onStatus(Status status) { - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - // exception - // shut down executor, etc. - close(); - watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - return; + + private void onStatus(Status status) { + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + // exception + // shut down executor, etc. + close(); + manager.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + return; + } + + manager.eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status.toString()); } - - watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status.toString()); } - - + + protected static WatchEvent readWatchEvent(String messageSource) { WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); KubernetesResource object = null; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java new file mode 100644 index 00000000000..e816054726a --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -0,0 +1,183 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.fabric8.kubernetes.api.model.Status; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.base.OperationSupport; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; +import okio.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.net.HttpURLConnection.HTTP_OK; + +abstract class WatcherWebSocketListener extends WebSocketListener { + protected static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); + + protected final AtomicReference webSocketRef; + /** + * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. + */ + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + /** + * Blocking queue for startup exceptions. + */ + private final BlockingQueue queue; + protected final AbstractWatchManager manager; + + public WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + this.manager = manager; + this.queue = queue; + this.webSocketRef = webSocketRef; + } + + @Override + public void onOpen(final WebSocket webSocket, Response response) { + if (response != null && response.body() != null) { + response.body().close(); + } + logger.debug("WebSocket successfully opened"); + webSocketRef.set(webSocket); + manager.resetReconnectAttempts(); + started.set(true); + queue.clear(); + queue.add(true); + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + if (manager.isForceClosed()) { + logger.debug("Ignoring onFailure for already closed/closing websocket", t); + // avoid resource leak though + if (response != null && response.body() != null) { + response.body().close(); + } + return; + } + + // We do not expect a 200 in response to the websocket connection. If it occurs, we throw + // an exception and try the watch via a persistent HTTP Get. + // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported + if (response != null && (response.code() == HTTP_OK || response.code() == 503)) { + queue.clear(); + queue.offer(new KubernetesClientException("Received " + response.code() + " on websocket", + response.code(), null)); + response.body().close(); + return; + } + + if (response != null) { + // We only need to queue startup failures. + Status status = OperationSupport.createStatus(response); + if (response.body() != null) { + response.body().close(); + } + logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), + t); + if (!started.get()) { + queue.clear(); + queue.offer(new KubernetesClientException(status)); + } + } else { + logger.warn("Exec Failure", t); + if (!started.get()) { + queue.clear(); + queue.offer(new KubernetesClientException("Failed to start websocket", t)); + } + } + + if (manager.cannotReconnect()) { + manager.closeEvent(new WatcherException("Connection failure", t)); + return; + } + + scheduleReconnect(); + } + + @Override + public void onMessage(WebSocket webSocket, ByteString bytes) { + onMessage(webSocket, bytes.utf8()); + } + + @Override + public void onClosing(WebSocket webSocket, int code, String reason) { + logger.debug("Socket closing: " + reason); + webSocket.close(code, reason); + } + + @Override + public void onClosed(WebSocket webSocket, int code, String reason) { + logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); + if (manager.isForceClosed()) { + logger.debug("Ignoring onClose for already closed/closing websocket"); + return; + } + if (manager.cannotReconnect()) { + manager.closeEvent(new WatcherException("Connection unexpectedly closed")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + logger.debug("Submitting reconnect task to the executor"); + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor + manager.submit(new NamedRunnable("scheduleReconnect") { + @Override + public void execute() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + webSocketRef.set(null); + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + manager.schedule(new NamedRunnable("reconnectAttempt") { + @Override + public void execute() { + try { + manager.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + webSocketRef.set(null); + manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + manager.close(); + } + } + }, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + reconnectPending.set(false); + } + } + }); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java new file mode 100644 index 00000000000..7e921afd379 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.fabric8.kubernetes.client.utils.Utils; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.WebSocket; + +abstract class WebSocketClientRunner extends AbstractWatchManager.ClientRunner { + private final AtomicReference webSocketRef = new AtomicReference<>(); + private final BlockingQueue queue = new ArrayBlockingQueue<>(1); + + public WebSocketClientRunner(OkHttpClient client) { + super(client); + } + + @Override + public void run(Request request) { + client().newWebSocket(request, newListener(queue, webSocketRef)); + } + + abstract WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef); + + @Override + public void close() { + AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null)); + } + + @Override + public void waitUntilReady() { + Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); + } +} From 1a12b1069175bbd32ece2354199c39ea6bd8d563 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 22:35:40 +0100 Subject: [PATCH 04/10] refactor: remove now unneeded client from AbstractWatchManager --- .../client/dsl/internal/AbstractWatchManager.java | 5 +---- .../dsl/internal/BaseOperationRequestBuilder.java | 3 ++- .../client/dsl/internal/RawRequestBuilder.java | 2 +- .../client/dsl/internal/RawWatchConnectionManager.java | 5 +---- .../client/dsl/internal/WatchConnectionManager.java | 5 +---- .../client/dsl/internal/WatchHTTPManager.java | 10 ++-------- .../client/dsl/internal/WatcherWebSocketListener.java | 2 +- .../client/dsl/internal/WebSocketClientRunner.java | 2 +- 8 files changed, 10 insertions(+), 24 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 45621043f52..069c7e1c3d0 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -39,7 +39,6 @@ public abstract class AbstractWatchManager implements Watch { final Watcher watcher; final AtomicReference resourceVersion; - final OkHttpClient clonedClient; final AtomicBoolean forceClosed; private final int reconnectLimit; @@ -53,14 +52,12 @@ public abstract class AbstractWatchManager implements Watch { AbstractWatchManager( - Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, - OkHttpClient clonedClient, RequestBuilder requestBuilder + Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder ) { this.watcher = watcher; this.reconnectLimit = reconnectLimit; this.reconnectInterval = reconnectInterval; this.maxIntervalExponent = maxIntervalExponent; - this.clonedClient = clonedClient; this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java index 9cc7f9d0944..b11075cb67e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java @@ -5,12 +5,13 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and + * limitations under the License. */ package io.fabric8.kubernetes.client.dsl.internal; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java index 5434865aded..ba9b92fc7d2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 83f4fd9219d..878789555ef 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -39,10 +39,7 @@ public class RawWatchConnectionManager extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { - super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - okHttpClient.newBuilder().build(), - new RawRequestBuilder(watchUrlBuilder)); + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new RawRequestBuilder(watchUrlBuilder)); initRunner(new WebSocketClientRunner(okHttpClient) { @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 6d5aaed7b5e..7aabc040065 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -43,10 +43,7 @@ public class WatchConnectionManager baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - client.newBuilder() - .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) - .build(), new BaseOperationRequestBuilder(baseOperation, listOptions) + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder(baseOperation, listOptions) ); initRunner(new WebSocketClientRunner(client) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 3cf9fc6157e..93efbaf8314 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.fabric8.kubernetes.client.dsl.internal; import java.io.IOException; @@ -69,12 +68,7 @@ public WatchHTTPManager(final OkHttpClient client, throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - client.newBuilder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) - .readTimeout(0, TimeUnit.MILLISECONDS) - .cache(null) - .build(), new BaseOperationRequestBuilder(baseOperation, listOptions) + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder(baseOperation, listOptions) ); initRunner(new HTTPClientRunner(client, this) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index e816054726a..9bf5fd4816e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java index 7e921afd379..7123e5dbd0a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, From 9412ddc76c08e2f06c3529cc777ae224ddef1f76 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 22:51:08 +0100 Subject: [PATCH 05/10] fix: make test compile --- .../dsl/internal/AbstractWatchManager.java | 2 +- .../client/dsl/internal/WatchHTTPManager.java | 10 +++------- .../dsl/internal/AbstractWatchManagerTest.java | 16 ++++++++++++---- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 069c7e1c3d0..26003a94e24 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -199,7 +199,7 @@ public ClientRunner(OkHttpClient client) { abstract void run(Request request); void close() {} - abstract void waitUntilReady(); + void waitUntilReady() {} abstract OkHttpClient cloneAndCustomize(OkHttpClient client); OkHttpClient client() { return client; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 93efbaf8314..93bf05d80c0 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -144,10 +144,6 @@ public void onResponse(Call call, Response response) throws IOException { }); } - - @Override - void waitUntilReady() {} - private void scheduleReconnect(boolean shouldBackoff) { if (manager.isForceClosed()) { logger.warn("Ignoring error for already closed/closing connection"); @@ -197,7 +193,7 @@ private void scheduleReconnect(boolean shouldBackoff) { } }); } - + public void onMessage(String messageSource) { try { WatchEvent event = readWatchEvent(messageSource); @@ -231,7 +227,7 @@ public void onMessage(String messageSource) { logger.error("Invalid event type", e); } } - + private void onStatus(Status status) { // The resource version no longer exists - this has to be handled by the caller. if (status.getCode() == HTTP_GONE) { @@ -241,7 +237,7 @@ private void onStatus(Status status) { manager.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); return; } - + manager.eventReceived(Action.ERROR, null); logger.error("Error received: {}", status.toString()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index cfa29824c63..a1a482b2ab8 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -196,9 +196,8 @@ private static class WatcherAdapter implements Watcher { private final AtomicInteger closeCount = new AtomicInteger(0); @Override - public void eventReceived(Action action, T resource) { - - } + public void eventReceived(Action action, T resource) {} + @Override public void onClose(WatcherException cause) { closeCount.addAndGet(1); @@ -213,7 +212,16 @@ public void onClose() { private static final class WatchManager extends AbstractWatchManager { public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { - super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, clonedClient, resourceVersion -> null); + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null); + initRunner(new ClientRunner(clonedClient) { + @Override + void run(Request request) {} + + @Override + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return clonedClient; + } + }); } @Override public void close() { From cb05a21753d2f0ea2192cbf77e34b859f38ba517 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jan 2021 23:09:21 +0100 Subject: [PATCH 06/10] fix: address some Sonar issues --- .../client/dsl/internal/AbstractWatchManager.java | 4 ++-- .../client/dsl/internal/RawWatchConnectionManager.java | 2 -- .../client/dsl/internal/WatchConnectionManager.java | 2 -- .../client/dsl/internal/WatcherWebSocketListener.java | 6 +++--- .../client/dsl/internal/WebSocketClientRunner.java | 2 +- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 26003a94e24..1b2f0ea938b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -190,10 +190,10 @@ interface RequestBuilder { Request build(final String resourceVersion); } - static abstract class ClientRunner { + abstract static class ClientRunner { private final OkHttpClient client; - public ClientRunner(OkHttpClient client) { + protected ClientRunner(OkHttpClient client) { this.client = cloneAndCustomize(client); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 878789555ef..7f4a19e62ca 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -36,8 +36,6 @@ * */ public class RawWatchConnectionManager extends AbstractWatchManager { - private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); - public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new RawRequestBuilder(watchUrlBuilder)); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 7aabc040065..056428eb720 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -33,8 +33,6 @@ import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import okhttp3.OkHttpClient; import okhttp3.WebSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager.readWatchEvent; import static java.net.HttpURLConnection.HTTP_GONE; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 9bf5fd4816e..7d15c21494c 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -35,7 +35,7 @@ import static java.net.HttpURLConnection.HTTP_OK; abstract class WatcherWebSocketListener extends WebSocketListener { - protected static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); + protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); protected final AtomicReference webSocketRef; /** @@ -49,7 +49,7 @@ abstract class WatcherWebSocketListener extends WebSocketListener { private final BlockingQueue queue; protected final AbstractWatchManager manager; - public WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + protected WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { this.manager = manager; this.queue = queue; this.webSocketRef = webSocketRef; @@ -125,7 +125,7 @@ public void onMessage(WebSocket webSocket, ByteString bytes) { @Override public void onClosing(WebSocket webSocket, int code, String reason) { - logger.debug("Socket closing: " + reason); + logger.debug("Socket closing: {}", reason); webSocket.close(code, reason); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java index 7123e5dbd0a..228ef6b4f7e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java @@ -29,7 +29,7 @@ abstract class WebSocketClientRunner extends AbstractWatchManager.ClientRunne private final AtomicReference webSocketRef = new AtomicReference<>(); private final BlockingQueue queue = new ArrayBlockingQueue<>(1); - public WebSocketClientRunner(OkHttpClient client) { + protected WebSocketClientRunner(OkHttpClient client) { super(client); } From db7bd811bced8bfe109a2b870ce97a45fc1025b7 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 12 Jan 2021 09:37:51 +0100 Subject: [PATCH 07/10] refactor: simplify onFailure, log error if exception cannot be queued --- .../internal/WatcherWebSocketListener.java | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 7d15c21494c..a45cb865238 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; abstract class WatcherWebSocketListener extends WebSocketListener { protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); @@ -78,35 +79,30 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { } return; } - - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported - if (response != null && (response.code() == HTTP_OK || response.code() == 503)) { - queue.clear(); - queue.offer(new KubernetesClientException("Received " + response.code() + " on websocket", - response.code(), null)); - response.body().close(); - return; - } - + if (response != null) { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); if (response.body() != null) { response.body().close(); } - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), - t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException(status)); + final int code = response.code(); + // We do not expect a 200 in response to the websocket connection. If it occurs, we throw + // an exception and try the watch via a persistent HTTP Get. + // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported + if (HTTP_OK == code || HTTP_UNAVAILABLE == code) { + pushException(new KubernetesClientException("Received " + code + " on websocket", code, null)); + return; + } else { + // We only need to queue startup failures. + Status status = OperationSupport.createStatus(response); + logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t); + if (!started.get()) { + pushException(new KubernetesClientException(status)); + } } } else { logger.warn("Exec Failure", t); if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException("Failed to start websocket", t)); + pushException(new KubernetesClientException("Failed to start websocket", t)); } } @@ -118,6 +114,13 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { scheduleReconnect(); } + private void pushException(KubernetesClientException exception) { + queue.clear(); + if (!queue.offer(exception)) { + logger.debug("Couldn't add exception " + exception + " to queue"); + } + } + @Override public void onMessage(WebSocket webSocket, ByteString bytes) { onMessage(webSocket, bytes.utf8()); From f6a15cdaa34e0e3f4ed22b091767f8e43945d95b Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 12 Jan 2021 09:47:58 +0100 Subject: [PATCH 08/10] refactor: make BaseOperationRequestBuilder generic --- .../client/dsl/internal/BaseOperationRequestBuilder.java | 8 +++++--- .../client/dsl/internal/WatchConnectionManager.java | 2 +- .../kubernetes/client/dsl/internal/WatchHTTPManager.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java index b11075cb67e..5a8936bbf07 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java @@ -18,6 +18,8 @@ import java.net.MalformedURLException; import java.net.URL; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.utils.HttpClientUtils; @@ -25,12 +27,12 @@ import okhttp3.HttpUrl; import okhttp3.Request; -class BaseOperationRequestBuilder implements AbstractWatchManager.RequestBuilder { +class BaseOperationRequestBuilder> implements AbstractWatchManager.RequestBuilder { private final URL requestUrl; - private final BaseOperation baseOperation; + private final BaseOperation baseOperation; private final ListOptions listOptions; - public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { + public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { this.baseOperation = baseOperation; this.requestUrl = baseOperation.getNamespacedUrl(); this.listOptions = listOptions; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 056428eb720..97a450116cc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -41,7 +41,7 @@ public class WatchConnectionManager baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder(baseOperation, listOptions) + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) ); initRunner(new WebSocketClientRunner(client) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 93bf05d80c0..0ab76a81497 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -68,7 +68,7 @@ public WatchHTTPManager(final OkHttpClient client, throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder(baseOperation, listOptions) + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) ); initRunner(new HTTPClientRunner(client, this) { From 5664d50c1ab75e5cff4c527bfdb6a4c6ce685759 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 12 Jan 2021 11:26:09 +0100 Subject: [PATCH 09/10] fix: only close body after we're done with it --- .../client/dsl/internal/WatcherWebSocketListener.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index a45cb865238..7caf9cea9b6 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -81,19 +81,18 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { } if (response != null) { - if (response.body() != null) { - response.body().close(); - } final int code = response.code(); // We do not expect a 200 in response to the websocket connection. If it occurs, we throw // an exception and try the watch via a persistent HTTP Get. // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported if (HTTP_OK == code || HTTP_UNAVAILABLE == code) { pushException(new KubernetesClientException("Received " + code + " on websocket", code, null)); + closeBody(response); return; } else { // We only need to queue startup failures. Status status = OperationSupport.createStatus(response); + closeBody(response); logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t); if (!started.get()) { pushException(new KubernetesClientException(status)); @@ -121,6 +120,12 @@ private void pushException(KubernetesClientException exception) { } } + private void closeBody(Response response) { + if (response.body() != null) { + response.body().close(); + } + } + @Override public void onMessage(WebSocket webSocket, ByteString bytes) { onMessage(webSocket, bytes.utf8()); From ce1ddb93107847e00c15e69f7a0bb88f857e8cc7 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 12 Jan 2021 13:04:42 +0100 Subject: [PATCH 10/10] fix: use built-in formatting --- .../client/dsl/internal/WatcherWebSocketListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 7caf9cea9b6..b42dccab410 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -116,7 +116,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { private void pushException(KubernetesClientException exception) { queue.clear(); if (!queue.offer(exception)) { - logger.debug("Couldn't add exception " + exception + " to queue"); + logger.debug("Couldn't add exception {} to queue", exception.getLocalizedMessage()); } }