Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
poutsma committed Jan 22, 2024
1 parent baf923c commit 31886d8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,7 +134,9 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) {
return Mono.fromDirect(request.toReactiveRequest()
.response((reactiveResponse, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer);
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer)
.doOnSubscribe(subscription -> System.out.println("Subscribed: " + subscription))
.doOnCancel(() -> System.out.println("Canceled"));
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Predicate;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;

import org.springframework.core.ParameterizedTypeReference;
Expand Down Expand Up @@ -439,43 +435,44 @@ public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> re
@Override
public Mono<ClientResponse> exchange() {
ClientRequest.Builder requestBuilder = initRequestBuilder();
ClientRequestObservationContext observationContext = new ClientRequestObservationContext(requestBuilder);
return Mono.deferContextual(contextView -> {
Observation observation = ClientHttpObservationDocumentation.HTTP_REACTIVE_CLIENT_EXCHANGES.observation(observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
observation
.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
.start();
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
if (filterFunctions != null) {
filterFunction = filterFunctions.andThen(filterFunction);
}
// ClientRequestObservationContext observationContext = new ClientRequestObservationContext(requestBuilder);
// return Mono.deferContextual(contextView -> {
// Observation observation = ClientHttpObservationDocumentation.HTTP_REACTIVE_CLIENT_EXCHANGES.observation(observationConvention,
// DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
// observation
// .parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
// .start();
// ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
// if (filterFunctions != null) {
// filterFunction = filterFunctions.andThen(filterFunction);
// }
// ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
ClientRequest request = requestBuilder
.attribute(ClientRequestObservationContext.CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext)
// .attribute(ClientRequestObservationContext.CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext)
.build();
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setRequest(request);
Mono<ClientResponse> responseMono = filterFunction.apply(exchangeFunction)
.exchange(request)
// observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
// observationContext.setRequest(request);
Mono<ClientResponse> responseMono = /*filterFunction.apply(exchangeFunction)*/
exchangeFunction.exchange(request)
.checkpoint("Request to " +
WebClientUtils.getRequestDescription(request.method(), request.url()) +
" [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
final AtomicBoolean responseReceived = new AtomicBoolean();
return responseMono
.doOnNext(response -> responseReceived.set(true))
.doOnError(observationContext::setError)
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL && !responseReceived.get()) {
observationContext.setAborted(true);
}
observation.stop();
})
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
});
// if (this.contextModifier != null) {
// responseMono = responseMono.contextWrite(this.contextModifier);
// }
// final AtomicBoolean responseReceived = new AtomicBoolean();
return responseMono;
// .doOnNext(response -> responseReceived.set(true))
// .doOnError(observationContext::setError)
// .doFinally(signalType -> {
// if (signalType == SignalType.CANCEL && !responseReceived.get()) {
// observationContext.setAborted(true);
// }
// observation.stop();
// })
// .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
// });
}

private ClientRequest.Builder initRequestBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@
import org.springframework.http.ResponseCookie;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector;
import org.springframework.http.client.reactive.JdkClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorNetty2ClientHttpConnector;
Expand Down Expand Up @@ -102,10 +100,10 @@ class WebClientIntegrationTests {

static Stream<Named<ClientHttpConnector>> arguments() {
return Stream.of(
named("Reactor Netty", new ReactorClientHttpConnector()),
named("JDK", new JdkClientHttpConnector()),
named("Jetty", new JettyClientHttpConnector()),
named("HttpComponents", new HttpComponentsClientHttpConnector())
// named("Reactor Netty", new ReactorClientHttpConnector()),
// named("JDK", new JdkClientHttpConnector()),
named("Jetty", new JettyClientHttpConnector())
// named("HttpComponents", new HttpComponentsClientHttpConnector())
);
}

Expand Down

0 comments on commit 31886d8

Please sign in to comment.