Skip to content

Commit

Permalink
Not using httpStatus() method for custom status codes
Browse files Browse the repository at this point in the history
curently whenever we try to retrieve the http status we get an exception for custom codes
we've migrated to using the raw status code. Also, due to spring-projects/spring-framework#23366 we had to manually wrap the ClientResponse so as we don't throw an exception on httpStatus() method

fixes gh-1393
  • Loading branch information
marcingrzejszczak committed Jul 26, 2019
1 parent d642ffd commit b59277f
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
Expand All @@ -41,12 +42,20 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;

/**
Expand Down Expand Up @@ -290,17 +299,102 @@ public void onNext(ClientResponse response) {
this.done = true;
try {
// decorate response body
this.actual
.onNext(ClientResponse.from(response)
.body(response.bodyToFlux(DataBuffer.class)
.transform(this.scopePassingTransformer))
.build());
this.actual.onNext(wrapped(response));
}
finally {
terminateSpan(response, null);
}
}

// TODO: Remove once fixed
// https://github.com/spring-projects/spring-framework/issues/23366
private ClientResponse wrapped(ClientResponse response) {
return new ClientResponse() {
@Override
public HttpStatus statusCode() {
try {
return response.statusCode();
}
catch (IllegalArgumentException ex) {
return null;
}
}

@Override
public int rawStatusCode() {
return response.rawStatusCode();
}

@Override
public Headers headers() {
return response.headers();
}

@Override
public MultiValueMap<String, ResponseCookie> cookies() {
return response.cookies();
}

@Override
public ExchangeStrategies strategies() {
return response.strategies();
}

@Override
public <T> T body(
BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return response.body(extractor);
}

@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
return response.bodyToMono(elementClass);
}

@Override
public <T> Mono<T> bodyToMono(
ParameterizedTypeReference<T> typeReference) {
return response.bodyToMono(typeReference);
}

@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
return (Flux<T>) response.bodyToFlux(DataBuffer.class)
.transform(scopePassingTransformer);
}

@Override
public <T> Flux<T> bodyToFlux(
ParameterizedTypeReference<T> typeReference) {
return (Flux<T>) response.bodyToFlux(DataBuffer.class)
.transform(scopePassingTransformer);
}

@Override
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
return response.toEntity(bodyType);
}

@Override
public <T> Mono<ResponseEntity<T>> toEntity(
ParameterizedTypeReference<T> typeReference) {
return response.toEntity(typeReference);
}

@Override
public <T> Mono<ResponseEntity<List<T>>> toEntityList(
Class<T> elementType) {
return response.toEntityList(elementType);
}

@Override
public <T> Mono<ResponseEntity<List<T>>> toEntityList(
ParameterizedTypeReference<T> typeReference) {
return response.toEntityList(typeReference);
}
};
}

@Override
public void onError(Throwable t) {
try {
Expand Down Expand Up @@ -346,30 +440,51 @@ void terminateSpanOnCancel() {

void terminateSpan(@Nullable ClientResponse clientResponse,
@Nullable Throwable throwable) {
if (clientResponse == null || clientResponse.statusCode() == null) {
if (clientResponse == null) {
if (log.isDebugEnabled()) {
log.debug("No response was returned. Will close the span ["
+ this.span + "]");
}
handleReceive(this.span, this.ws, clientResponse, throwable);
return;
}
boolean error = clientResponse.statusCode().is4xxClientError()
|| clientResponse.statusCode().is5xxServerError();
int statusCode = statusCodeAsInt(clientResponse);
boolean error = isError(statusCode);
if (error) {
if (log.isDebugEnabled()) {
log.debug(
"Non positive status code was returned from the call. Will close the span ["
+ this.span + "]");
}
throwable = new RestClientException("Status code of the response is ["
+ clientResponse.statusCode().value()
+ "] and the reason is ["
+ clientResponse.statusCode().getReasonPhrase() + "]");
+ statusCode + "] and the reason is ["
+ reasonPhrase(clientResponse) + "]");
}
handleReceive(this.span, this.ws, clientResponse, throwable);
}

private String reasonPhrase(ClientResponse clientResponse) {
try {
return clientResponse.statusCode().getReasonPhrase();
}
catch (IllegalArgumentException ex) {
return "";
}
}

private boolean isError(int code) {
return code >= 400;
}

private int statusCodeAsInt(ClientResponse response) {
try {
return response.rawStatusCode();
}
catch (Exception dontCare) {
return 0;
}
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,22 @@ public void shouldAttachTraceIdWhenCallingAnotherServiceViaWebClient() {
.contains("CLIENT");
}

@Test
@SuppressWarnings("unchecked")
public void shouldNotBreakWhenCustomStatusCodeIsSetViaWebClient() {
Span span = this.tracer.nextSpan().name("foo").start();

try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) {
this.webClient.get()
.uri("http://localhost:" + this.port + "/customstatuscode").exchange()
.block();
}
finally {
span.finish();
}
then(this.tracer.currentSpan()).isNull();
}

@Test
@Ignore("Flakey on CI")
public void shouldReportTraceForCancelledRequestViaWebClient() {
Expand Down Expand Up @@ -663,6 +679,12 @@ public String traceId(@RequestHeader(TRACE_ID_NAME) String traceId,
return traceId;
}

@RequestMapping(value = "/customstatuscode", method = RequestMethod.GET)
public ResponseEntity customStatusCode() {
this.span = this.tracer.currentSpan();
return ResponseEntity.status(499).build();
}

@RequestMapping("/")
public Map<String, String> home(@RequestHeader HttpHeaders headers) {
Map<String, String> map = new HashMap<>();
Expand Down

0 comments on commit b59277f

Please sign in to comment.