Skip to content

Commit

Permalink
Merge pull request fabric8io#2714 from fabric8io/refactor-watchers
Browse files Browse the repository at this point in the history
Refactor watchers
  • Loading branch information
manusa committed Jan 15, 2021
2 parents 54561af + ce1ddb9 commit 32ae318
Show file tree
Hide file tree
Showing 9 changed files with 720 additions and 674 deletions.
Expand Up @@ -21,6 +21,7 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,28 +38,26 @@ public abstract class AbstractWatchManager<T> implements Watch {
private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);

final Watcher<T> watcher;
final ListOptions listOptions;
final AtomicReference<String> resourceVersion;
final OkHttpClient clonedClient;

final AtomicBoolean forceClosed;
private final int reconnectLimit;
private final int reconnectInterval;
private final int maxIntervalExponent;
final AtomicInteger currentReconnectAttempt;
private final ScheduledExecutorService executorService;

private final RequestBuilder requestBuilder;
protected ClientRunner runner;


AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent,
OkHttpClient clonedClient
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder
) {
this.watcher = watcher;
this.listOptions = listOptions;
this.reconnectLimit = reconnectLimit;
this.reconnectInterval = reconnectInterval;
this.maxIntervalExponent = maxIntervalExponent;
this.clonedClient = clonedClient;
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
Expand All @@ -67,6 +66,15 @@ public abstract class AbstractWatchManager<T> implements Watch {
ret.setDaemon(true);
return ret;
});

this.requestBuilder = requestBuilder;
}

protected void initRunner(ClientRunner runner) {
if (this.runner != null) {
throw new IllegalStateException("ClientRunner has already been initialized");
}
this.runner = runner;
}

final void closeEvent(WatcherException cause) {
Expand Down Expand Up @@ -124,6 +132,37 @@ final long nextReconnectInterval() {
logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
return ret;
}

void resetReconnectAttempts() {
currentReconnectAttempt.set(0);
}

boolean isForceClosed() {
return forceClosed.get();
}

void eventReceived(Watcher.Action action, T resource) {
watcher.eventReceived(action, resource);
}

void onClose(WatcherException cause) {
watcher.onClose(cause);
}

void updateResourceVersion(final String newResourceVersion) {
resourceVersion.set(newResourceVersion);
}

protected void runWatch() {
final Request request = requestBuilder.build(resourceVersion.get());
logger.debug("Watching {}...", request.url());

runner.run(request);
}

public void waitUntilReady() {
runner.waitUntilReady();
}

static void closeWebSocket(WebSocket webSocket) {
if (webSocket != null) {
Expand All @@ -137,4 +176,33 @@ static void closeWebSocket(WebSocket webSocket) {
}
}
}

@Override
public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent();
runner.close();
closeExecutorService();
}

@FunctionalInterface
interface RequestBuilder {
Request build(final String resourceVersion);
}

abstract static class ClientRunner {
private final OkHttpClient client;

protected ClientRunner(OkHttpClient client) {
this.client = cloneAndCustomize(client);
}

abstract void run(Request request);
void close() {}
void waitUntilReady() {}
abstract OkHttpClient cloneAndCustomize(OkHttpClient client);
OkHttpClient client() {
return client;
}
}
}
@@ -0,0 +1,77 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import java.net.MalformedURLException;
import java.net.URL;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.HttpUrl;
import okhttp3.Request;

class BaseOperationRequestBuilder<T extends HasMetadata, L extends KubernetesResourceList<T>> implements AbstractWatchManager.RequestBuilder {
private final URL requestUrl;
private final BaseOperation<T, L, ?> baseOperation;
private final ListOptions listOptions;

public BaseOperationRequestBuilder(BaseOperation<T, L, ?> baseOperation, ListOptions listOptions) throws MalformedURLException {
this.baseOperation = baseOperation;
this.requestUrl = baseOperation.getNamespacedUrl();
this.listOptions = listOptions;
}

@Override
public Request build(final String resourceVersion) {
HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();

String labelQueryParam = baseOperation.getLabelQueryParam();
if (Utils.isNotNullOrEmpty(labelQueryParam)) {
httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam);
}

String fieldQueryString = baseOperation.getFieldQueryParam();
String name = baseOperation.getName();

if (name != null && name.length() > 0) {
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
}
if (Utils.isNotNullOrEmpty(fieldQueryString)) {
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
}

listOptions.setResourceVersion(resourceVersion);
HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions);

String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost();
if (requestUrl.getPort() != -1) {
origin += ":" + requestUrl.getPort();
}

return new Request.Builder()
.get()
.url(httpUrlBuilder.build())
.addHeader("Origin", origin)
.build();
}
}
@@ -0,0 +1,46 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import okhttp3.HttpUrl;
import okhttp3.Request;

class RawRequestBuilder implements AbstractWatchManager.RequestBuilder {
private final HttpUrl.Builder watchUrlBuilder;

public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) {
this.watchUrlBuilder = watchUrlBuilder;
}

@Override
public Request build(String resourceVersion) {
if (resourceVersion != null) {
watchUrlBuilder.removeAllQueryParameters("resourceVersion");
watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion);
}
HttpUrl watchUrl = watchUrlBuilder.build();
String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost();
if (watchUrl.url().getPort() != -1) {
origin += ":" + watchUrl.url().getPort();
}

return new Request.Builder()
.get()
.url(watchUrl)
.addHeader("Origin", origin)
.build();
}
}

0 comments on commit 32ae318

Please sign in to comment.