Skip to content

Commit

Permalink
Merge pull request #4430 from shawkins/4201
Browse files Browse the repository at this point in the history
fix #4201: generalizing sendAsync support
  • Loading branch information
manusa committed Nov 29, 2022
2 parents 2f9f7ad + d2c2ae9 commit 432c48a
Show file tree
Hide file tree
Showing 28 changed files with 930 additions and 358 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -15,7 +15,8 @@

#### Improvements
* Fix #4355: for exec, attach, upload, and copy operations the container id/name will be validated or chosen prior to the remote call. You may also use the kubectl.kubernetes.io/default-container annotation to specify the default container.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations.
* Fix #4201: Removed sendAsync from the individual http client implementations

#### Dependency Upgrade

Expand Down
Expand Up @@ -17,34 +17,31 @@
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.net.http.HttpResponse.ResponseInfo;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -55,48 +52,80 @@
*/
public class JdkHttpClientImpl implements HttpClient {

private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final BodyConsumer<T> consumer;
/**
* Adapts the BodyHandler to immediately complete the body
*/
private static final class BodyHandlerAdapter implements BodyHandler<AsyncBody> {
private final AsyncBodySubscriber<?> subscriber;
private final BodyHandler<Void> handler;

private BodyHandlerAdapter(AsyncBodySubscriber<?> subscriber, BodyHandler<Void> handler) {
this.subscriber = subscriber;
this.handler = handler;
}

@Override
public BodySubscriber<AsyncBody> apply(ResponseInfo responseInfo) {
BodySubscriber<Void> bodySubscriber = handler.apply(responseInfo);
return new BodySubscriber<AsyncBody>() {
CompletableFuture<AsyncBody> cf = CompletableFuture.completedFuture(subscriber);

@Override
public void onSubscribe(Subscription subscription) {
bodySubscriber.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
bodySubscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
bodySubscriber.onError(throwable);
}

@Override
public void onComplete() {
bodySubscriber.onComplete();
}

@Override
public CompletionStage<AsyncBody> getBody() {
return cf;
}
};
}
}

private static final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final AsyncBody.Consumer<T> consumer;
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
private AsyncBodySubscriber(AsyncBody.Consumer<T> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Subscription subscription) {
if (!subscribed.compareAndSet(false, true)) {
if (this.subscription.isDone()) {
subscription.cancel();
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
this.subscription.complete(subscription);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
} else {
consumer.consume(item, this);
}
} catch (Exception e) {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.completeExceptionally(e);
}
}
Expand All @@ -108,10 +137,6 @@ public void onError(Throwable throwable) {

@Override
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

Expand All @@ -120,19 +145,7 @@ public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
this.subscription.thenAccept(s -> s.request(1));
}

@Override
Expand All @@ -142,7 +155,7 @@ public CompletableFuture<Void> done() {

@Override
public void cancel() {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.cancel(false);
}

Expand Down Expand Up @@ -240,52 +253,26 @@ public DerivedClientBuilder newBuilder() {
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, BodyConsumer<String> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, BodyConsumer<List<ByteBuffer>> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
} else if (type == InputStream.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofInputStream();
} else if (type == String.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofString();
} else if (type == byte[].class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofByteArray();
} else {
bodyHandler = responseInfo -> {
BodySubscriber<InputStream> upstream = BodyHandlers.ofInputStream().apply(responseInfo);

BodySubscriber<Reader> downstream = BodySubscribers.mapping(
upstream,
(InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8));
return (BodySubscriber<T>) downstream;
};
}
return bodyHandler;
}

public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
Expand Down
Expand Up @@ -15,37 +15,46 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.Callback;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {
public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<HttpResponse<AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private boolean consume;
private final CompletableFuture<LongConsumer> demand;
private boolean initialConsumeCalled;
private Runnable initialConsume;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
JettyAsyncResponseListener(HttpRequest httpRequest) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
consume = false;
demand = new CompletableFuture<>();
}

@Override
public synchronized void consume() {
consume = true;
this.notifyAll();
public void consume() {
synchronized (this) {
if (!this.initialConsumeCalled) {
this.initialConsumeCalled = true;
if (this.initialConsume != null) {
this.initialConsume.run();
this.initialConsume = null;
}
}
}
demand.thenAccept(l -> l.accept(1));
}

@Override
Expand All @@ -68,29 +77,37 @@ public void onComplete(Result result) {
asyncBodyDone.complete(null);
}

public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request request) {
public CompletableFuture<HttpResponse<AsyncBody>> listen(Request request) {
request.send(this);
return asyncResponse;
}

@Override
public void onContent(Response response, ByteBuffer content) {
try {
synchronized (this) {
while (!consume && !asyncBodyDone.isCancelled()) {
this.wait();
}
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) {
synchronized (this) {
if (!initialConsumeCalled) {
// defer until consume is called
this.initialConsume = () -> onContent(response, demand, content, callback);
return;
}
if (!asyncBodyDone.isCancelled()) {
bodyConsumer.consume(process(response, content), this);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
this.demand.complete(demand);
}
try {
onContent(content);
callback.succeeded();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
callback.failed(e);
}
}

protected abstract T process(Response response, ByteBuffer content);
/**
* Implement to consume the content of the chunked response.
* <p>
* Each chunk will be passed <b>in order</b> to this function (<code>onContent{callback.succeeded}</code>)
*
* @param content the ByteBuffer containing a chunk of the response.
* @throws Exception in case the downstream consumer throws an exception.
*/
protected abstract void onContent(ByteBuffer content) throws Exception;

}

0 comments on commit 432c48a

Please sign in to comment.