Skip to content

Commit

Permalink
Fix handling of Flux responses from Actuator endpoints
Browse files Browse the repository at this point in the history
Closes gh-30095
  • Loading branch information
wilkinsona committed Mar 10, 2022
1 parent 2f7feee commit 6eacc07
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 6 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import javax.management.MBeanInfo;
import javax.management.ReflectionException;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
Expand Down Expand Up @@ -172,6 +173,9 @@ public AttributeList setAttributes(AttributeList attributes) {
private static class ReactiveHandler {

static Object handle(Object result) {
if (result instanceof Flux) {
result = ((Flux<?>) result).collectList();
}
if (result instanceof Mono) {
return ((Mono<?>) result).block();
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.glassfish.jersey.server.ContainerRequest;
import org.glassfish.jersey.server.model.Resource;
import org.glassfish.jersey.server.model.Resource.Builder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
Expand Down Expand Up @@ -128,6 +129,7 @@ private static final class OperationInflector implements Inflector<ContainerRequ
List<Function<Object, Object>> converters = new ArrayList<>();
converters.add(new ResourceBodyConverter());
if (ClassUtils.isPresent("reactor.core.publisher.Mono", OperationInflector.class.getClassLoader())) {
converters.add(new FluxBodyConverter());
converters.add(new MonoBodyConverter());
}
BODY_CONVERTERS = Collections.unmodifiableList(converters);
Expand Down Expand Up @@ -268,6 +270,21 @@ public Object apply(Object body) {

}

/**
* Body converter from {@link Flux} to {@link Flux#collectList Mono&lt;List&gt;}.
*/
private static final class FluxBodyConverter implements Function<Object, Object> {

@Override
public Object apply(Object body) {
if (body instanceof Flux) {
return ((Flux<?>) body).collectList();
}
return body;
}

}

/**
* {@link Inflector} to for endpoint links.
*/
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -338,6 +339,9 @@ private Map<String, String> getTemplateVariables(ServerWebExchange exchange) {
}

private Mono<ResponseEntity<Object>> handleResult(Publisher<?> result, HttpMethod httpMethod) {
if (result instanceof Flux) {
result = ((Flux<?>) result).collectList();
}
return Mono.from(result).map(this::toResponseEntity)
.onErrorMap(InvalidEndpointRequestException.class,
(ex) -> new ResponseStatusException(HttpStatus.BAD_REQUEST, ex.getReason()))
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,16 +19,21 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import reactor.core.publisher.Flux;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
import org.springframework.boot.actuate.endpoint.InvocationContext;
Expand All @@ -49,6 +54,7 @@
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestBody;
Expand Down Expand Up @@ -274,6 +280,17 @@ private static class ServletWebOperationAdapter implements ServletWebOperation {

private static final String PATH_SEPARATOR = AntPathMatcher.DEFAULT_PATH_SEPARATOR;

private static final List<Function<Object, Object>> BODY_CONVERTERS;

static {
List<Function<Object, Object>> converters = new ArrayList<>();
if (ClassUtils.isPresent("reactor.core.publisher.Flux",
ServletWebOperationAdapter.class.getClassLoader())) {
converters.add(new FluxBodyConverter());
}
BODY_CONVERTERS = Collections.unmodifiableList(converters);
}

private final WebOperation operation;

ServletWebOperationAdapter(WebOperation operation) {
Expand Down Expand Up @@ -350,12 +367,32 @@ private Object handleResult(Object result, HttpMethod httpMethod) {
(httpMethod != HttpMethod.GET) ? HttpStatus.NO_CONTENT : HttpStatus.NOT_FOUND);
}
if (!(result instanceof WebEndpointResponse)) {
return result;
return convertIfNecessary(result);
}
WebEndpointResponse<?> response = (WebEndpointResponse<?>) result;
MediaType contentType = (response.getContentType() != null) ? new MediaType(response.getContentType())
: null;
return ResponseEntity.status(response.getStatus()).contentType(contentType).body(response.getBody());
return ResponseEntity.status(response.getStatus()).contentType(contentType)
.body(convertIfNecessary(response.getBody()));
}

private Object convertIfNecessary(Object body) {
for (Function<Object, Object> converter : BODY_CONVERTERS) {
body = converter.apply(body);
}
return body;
}

private static class FluxBodyConverter implements Function<Object, Object> {

@Override
public Object apply(Object body) {
if (!(body instanceof Flux)) {
return body;
}
return ((Flux<?>) body).collectList();
}

}

}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import javax.management.ReflectionException;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.beans.FatalBeanException;
Expand Down Expand Up @@ -155,6 +156,15 @@ void invokeWhenMonoResultShouldBlockOnMono() throws MBeanException, ReflectionEx
assertThat(result).isEqualTo("monoResult");
}

@Test
void invokeWhenFluxResultShouldCollectToMonoListAndBlockOnMono() throws MBeanException, ReflectionException {
TestExposableJmxEndpoint endpoint = new TestExposableJmxEndpoint(
new TestJmxOperation((arguments) -> Flux.just("flux", "result")));
EndpointMBean bean = new EndpointMBean(this.responseMapper, null, endpoint);
Object result = bean.invoke("testOperation", NO_PARAMS, NO_SIGNATURE);
assertThat(result).asList().containsExactly("flux", "result");
}

@Test
void invokeShouldCallResponseMapper() throws MBeanException, ReflectionException {
TestJmxOperationResponseMapper responseMapper = spy(this.responseMapper);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Supplier;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.boot.actuate.endpoint.SecurityContext;
Expand Down Expand Up @@ -269,6 +270,14 @@ void readOperationWithMonoResponse() {
.isOk().expectBody().jsonPath("a").isEqualTo("alpha"));
}

@Test
void readOperationWithFluxResponse() {
load(FluxResponseEndpointConfiguration.class,
(client) -> client.get().uri("/flux").exchange().expectStatus().isOk().expectBody().jsonPath("[0].a")
.isEqualTo("alpha").jsonPath("[1].b").isEqualTo("bravo").jsonPath("[2].c")
.isEqualTo("charlie"));
}

@Test
void readOperationWithCustomMediaType() {
load(CustomMediaTypesEndpointConfiguration.class, (client) -> client.get().uri("/custommediatypes").exchange()
Expand Down Expand Up @@ -564,6 +573,17 @@ MonoResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) {

}

@Configuration(proxyBeanMethods = false)
@Import(BaseConfiguration.class)
static class FluxResponseEndpointConfiguration {

@Bean
FluxResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) {
return new FluxResponseEndpoint();
}

}

@Configuration(proxyBeanMethods = false)
@Import(BaseConfiguration.class)
static class CustomMediaTypesEndpointConfiguration {
Expand Down Expand Up @@ -806,6 +826,17 @@ Mono<Map<String, String>> operation() {

}

@Endpoint(id = "flux")
static class FluxResponseEndpoint {

@ReadOperation
Flux<Map<String, String>> operation() {
return Flux.just(Collections.singletonMap("a", "alpha"), Collections.singletonMap("b", "bravo"),
Collections.singletonMap("c", "charlie"));
}

}

@Endpoint(id = "custommediatypes")
static class CustomMediaTypesEndpoint {

Expand Down

0 comments on commit 6eacc07

Please sign in to comment.