Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4201: generalizing sendAsync support #4430

Merged
merged 10 commits into from
Nov 29, 2022
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

#### Improvements
* Fix #4355: for exec, attach, upload, and copy operations the container id/name will be validated or chosen prior to the remote call. You may also use the kubectl.kubernetes.io/default-container annotation to specify the default container.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4201: Removed sendAsync from the individual http client implementations

#### Dependency Upgrade

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,31 @@
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.net.http.HttpResponse.ResponseInfo;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -55,48 +52,80 @@
*/
public class JdkHttpClientImpl implements HttpClient {

private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final BodyConsumer<T> consumer;
/**
* Adapts the BodyHandler to immediately complete the body
*/
private static final class BodyHandlerAdapter implements BodyHandler<AsyncBody> {
private final AsyncBodySubscriber<?> subscriber;
private final BodyHandler<Void> handler;

private BodyHandlerAdapter(AsyncBodySubscriber<?> subscriber, BodyHandler<Void> handler) {
this.subscriber = subscriber;
this.handler = handler;
}

@Override
public BodySubscriber<AsyncBody> apply(ResponseInfo responseInfo) {
BodySubscriber<Void> bodySubscriber = handler.apply(responseInfo);
return new BodySubscriber<AsyncBody>() {
CompletableFuture<AsyncBody> cf = CompletableFuture.completedFuture(subscriber);

@Override
public void onSubscribe(Subscription subscription) {
bodySubscriber.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
bodySubscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
bodySubscriber.onError(throwable);
}

@Override
public void onComplete() {
bodySubscriber.onComplete();
}

@Override
public CompletionStage<AsyncBody> getBody() {
return cf;
}
};
}
}

private static final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final AsyncBody.Consumer<T> consumer;
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
private AsyncBodySubscriber(AsyncBody.Consumer<T> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Subscription subscription) {
if (!subscribed.compareAndSet(false, true)) {
if (this.subscription.isDone()) {
subscription.cancel();
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
this.subscription.complete(subscription);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
} else {
consumer.consume(item, this);
}
} catch (Exception e) {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.completeExceptionally(e);
}
}
Expand All @@ -108,10 +137,6 @@ public void onError(Throwable throwable) {

@Override
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

Expand All @@ -120,19 +145,7 @@ public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
this.subscription.thenAccept(s -> s.request(1));
}

@Override
Expand All @@ -142,7 +155,7 @@ public CompletableFuture<Void> done() {

@Override
public void cancel() {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.cancel(false);
}

Expand Down Expand Up @@ -240,52 +253,26 @@ public DerivedClientBuilder newBuilder() {
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, BodyConsumer<String> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, BodyConsumer<List<ByteBuffer>> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
} else if (type == InputStream.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofInputStream();
} else if (type == String.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofString();
} else if (type == byte[].class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofByteArray();
} else {
bodyHandler = responseInfo -> {
BodySubscriber<InputStream> upstream = BodyHandlers.ofInputStream().apply(responseInfo);

BodySubscriber<Reader> downstream = BodySubscribers.mapping(
upstream,
(InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8));
return (BodySubscriber<T>) downstream;
};
}
return bodyHandler;
}

public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,46 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.Callback;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {
public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<HttpResponse<AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private boolean consume;
private final CompletableFuture<LongConsumer> demand;
private boolean initialConsumeCalled;
private Runnable initialConsume;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
JettyAsyncResponseListener(HttpRequest httpRequest) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
consume = false;
demand = new CompletableFuture<>();
}

@Override
public synchronized void consume() {
consume = true;
this.notifyAll();
public void consume() {
synchronized (this) {
if (!this.initialConsumeCalled) {
this.initialConsumeCalled = true;
if (this.initialConsume != null) {
this.initialConsume.run();
this.initialConsume = null;
}
}
}
demand.thenAccept(l -> l.accept(1));
}

@Override
Expand All @@ -68,29 +77,37 @@ public void onComplete(Result result) {
asyncBodyDone.complete(null);
}

public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request request) {
public CompletableFuture<HttpResponse<AsyncBody>> listen(Request request) {
request.send(this);
return asyncResponse;
}

@Override
public void onContent(Response response, ByteBuffer content) {
try {
synchronized (this) {
while (!consume && !asyncBodyDone.isCancelled()) {
this.wait();
}
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) {
synchronized (this) {
if (!initialConsumeCalled) {
// defer until consume is called
this.initialConsume = () -> onContent(response, demand, content, callback);
return;
}
if (!asyncBodyDone.isCancelled()) {
bodyConsumer.consume(process(response, content), this);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
this.demand.complete(demand);
}
try {
onContent(content);
callback.succeeded();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
callback.failed(e);
}
}

protected abstract T process(Response response, ByteBuffer content);
/**
* Implement to consume the content of the chunked response.
* <p>
* Each chunk will be passed <b>in order</b> to this function (<code>onContent{callback.succeeded}</code>)
*
* @param content the ByteBuffer containing a chunk of the response.
* @throws Exception in case the downstream consumer throws an exception.
*/
protected abstract void onContent(ByteBuffer content) throws Exception;

}