Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4201: generalizing sendAsync support #4430

Merged
merged 10 commits into from
Nov 29, 2022
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

#### Improvements
* Fix #4355: for exec, attach, upload, and copy operations the container id/name will be validated or chosen prior to the remote call. You may also use the kubectl.kubernetes.io/default-container annotation to specify the default container.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4201: Removed sendAsync from the individual http client implementations

#### Dependency Upgrade

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,23 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.net.http.HttpResponse.ResponseInfo;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -55,48 +51,80 @@
*/
public class JdkHttpClientImpl implements HttpClient {

/**
* Adapts the BodyHandler to immediately complete the body
*/
private static final class BodyHandlerAdapter implements BodyHandler<AsyncBody> {
private final AsyncBodySubscriber<?> subscriber;
private final BodyHandler<Void> handler;

private BodyHandlerAdapter(AsyncBodySubscriber<?> subscriber, BodyHandler<Void> handler) {
this.subscriber = subscriber;
this.handler = handler;
}

@Override
public BodySubscriber<AsyncBody> apply(ResponseInfo responseInfo) {
BodySubscriber<Void> bodySubscriber = handler.apply(responseInfo);
return new BodySubscriber<AsyncBody>() {
CompletableFuture<AsyncBody> cf = CompletableFuture.completedFuture(subscriber);

@Override
public void onSubscribe(Subscription subscription) {
bodySubscriber.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
bodySubscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
bodySubscriber.onError(throwable);
}

@Override
public void onComplete() {
bodySubscriber.onComplete();
}

@Override
public CompletionStage<AsyncBody> getBody() {
return cf;
}
};
}
}

private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final BodyConsumer<T> consumer;
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Subscription subscription) {
if (!subscribed.compareAndSet(false, true)) {
if (this.subscription.isDone()) {
subscription.cancel();
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
this.subscription.complete(subscription);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
} else {
consumer.consume(item, this);
}
} catch (Exception e) {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.completeExceptionally(e);
}
}
Expand All @@ -108,10 +136,6 @@ public void onError(Throwable throwable) {

@Override
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

Expand All @@ -120,19 +144,7 @@ public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
this.subscription.thenAccept(s -> s.request(1));
}

@Override
Expand All @@ -142,7 +154,7 @@ public CompletableFuture<Void> done() {

@Override
public void cancel() {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.cancel(false);
}

Expand Down Expand Up @@ -244,7 +256,8 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest reque
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

Expand All @@ -253,39 +266,11 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
} else if (type == InputStream.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofInputStream();
} else if (type == String.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofString();
} else if (type == byte[].class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofByteArray();
} else {
bodyHandler = responseInfo -> {
BodySubscriber<InputStream> upstream = BodyHandlers.ofInputStream().apply(responseInfo);

BodySubscriber<Reader> downstream = BodySubscribers.mapping(
upstream,
(InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8));
return (BodySubscriber<T>) downstream;
};
}
return bodyHandler;
}

public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,47 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.Callback;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private boolean consume;
private CompletableFuture<LongConsumer> demand = new CompletableFuture<>();
private boolean initialConsumeCalled;
private Runnable initialConsume;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
consume = false;
}

@Override
public synchronized void consume() {
consume = true;
this.notifyAll();
public void consume() {
synchronized (this) {
if (!this.initialConsumeCalled) {
this.initialConsumeCalled = true;
if (this.initialConsume != null) {
this.initialConsume.run();
this.initialConsume = null;
}
}
}
demand.thenAccept(l -> l.accept(1));
}

@Override
Expand Down Expand Up @@ -74,23 +84,30 @@ public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request requ
}

@Override
public void onContent(Response response, ByteBuffer content) {
try {
synchronized (this) {
while (!consume && !asyncBodyDone.isCancelled()) {
this.wait();
}
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) {
synchronized (this) {
if (!initialConsumeCalled) {
// defer until consume is called
this.initialConsume = () -> onContent(response, demand, content, callback);
return;
}
if (!asyncBodyDone.isCancelled()) {
bodyConsumer.consume(process(response, content), this);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
this.demand.complete(demand);
}
try {
// we must clone as the buffer can be reused after the call to succeeded
bodyConsumer.consume(process(response, clone(content)), this);
callback.succeeded();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
callback.failed(e);
}
}

protected abstract T process(Response response, ByteBuffer content);

public static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.remaining());
clone.put(original);
clone.flip();
return clone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.websocket.client.WebSocketClient;
Expand All @@ -41,7 +39,6 @@

import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN;
import static org.eclipse.jetty.util.BufferUtil.toArray;

public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient {

Expand Down Expand Up @@ -77,39 +74,10 @@ public DerivedClientBuilder newBuilder() {
return builder.copy(this);
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest originalRequest, Class<T> type) {
final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type);
final var request = toStandardHttpRequest(originalRequest);
final CompletableFuture<HttpResponse<T>> future = new CompletableFuture<>();
newRequest(request).send(new BufferingResponseListener() {

// TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory.
// Find a way to stream the response body without completing the future
// We need two signals, one when the response is received, and one when the body is completely
// read.
// Should this method be completely replaced by consumeXxx()?
@Override
public void onComplete(Result result) {
future.complete(new JettyHttpResponse<>(
request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type)));
}
});
return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, BodyConsumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener<>(request, consumer) {

@Override
protected String process(Response response, ByteBuffer content) {
return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class);
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
throw new UnsupportedOperationException("Not supported by the Jetty client");
}

@Override
Expand Down