diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBean.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBean.java index bce34a41f9ad..5bd2031ad756 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBean.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import javax.management.MBeanInfo; import javax.management.ReflectionException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException; @@ -172,6 +173,9 @@ public AttributeList setAttributes(AttributeList attributes) { private static class ReactiveHandler { static Object handle(Object result) { + if (result instanceof Flux) { + result = ((Flux) result).collectList(); + } if (result instanceof Mono) { return ((Mono) result).block(); } diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/jersey/JerseyEndpointResourceFactory.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/jersey/JerseyEndpointResourceFactory.java index f540699f744b..feb08cffb6ea 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/jersey/JerseyEndpointResourceFactory.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/jersey/JerseyEndpointResourceFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import org.glassfish.jersey.server.ContainerRequest; import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.Resource.Builder; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException; @@ -128,6 +129,7 @@ private static final class OperationInflector implements Inflector> converters = new ArrayList<>(); converters.add(new ResourceBodyConverter()); if (ClassUtils.isPresent("reactor.core.publisher.Mono", OperationInflector.class.getClassLoader())) { + converters.add(new FluxBodyConverter()); converters.add(new MonoBodyConverter()); } BODY_CONVERTERS = Collections.unmodifiableList(converters); @@ -268,6 +270,21 @@ public Object apply(Object body) { } + /** + * Body converter from {@link Flux} to {@link Flux#collectList Mono<List>}. + */ + private static final class FluxBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (body instanceof Flux) { + return ((Flux) body).collectList(); + } + return body; + } + + } + /** * {@link Inflector} to for endpoint links. */ diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/reactive/AbstractWebFluxEndpointHandlerMapping.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/reactive/AbstractWebFluxEndpointHandlerMapping.java index 118066e1970a..3ae2ea0fb156 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/reactive/AbstractWebFluxEndpointHandlerMapping.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/reactive/AbstractWebFluxEndpointHandlerMapping.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.function.Supplier; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -338,6 +339,9 @@ private Map getTemplateVariables(ServerWebExchange exchange) { } private Mono> handleResult(Publisher result, HttpMethod httpMethod) { + if (result instanceof Flux) { + result = ((Flux) result).collectList(); + } return Mono.from(result).map(this::toResponseEntity) .onErrorMap(InvalidEndpointRequestException.class, (ex) -> new ResponseStatusException(HttpStatus.BAD_REQUEST, ex.getReason())) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/servlet/AbstractWebMvcEndpointHandlerMapping.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/servlet/AbstractWebMvcEndpointHandlerMapping.java index 669b53a791af..c7482f976e3e 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/servlet/AbstractWebMvcEndpointHandlerMapping.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/web/servlet/AbstractWebMvcEndpointHandlerMapping.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,21 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.security.Principal; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import reactor.core.publisher.Flux; + import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException; import org.springframework.boot.actuate.endpoint.InvocationContext; @@ -49,6 +54,7 @@ import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.util.AntPathMatcher; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.RequestBody; @@ -274,6 +280,17 @@ private static class ServletWebOperationAdapter implements ServletWebOperation { private static final String PATH_SEPARATOR = AntPathMatcher.DEFAULT_PATH_SEPARATOR; + private static final List> BODY_CONVERTERS; + + static { + List> converters = new ArrayList<>(); + if (ClassUtils.isPresent("reactor.core.publisher.Flux", + ServletWebOperationAdapter.class.getClassLoader())) { + converters.add(new FluxBodyConverter()); + } + BODY_CONVERTERS = Collections.unmodifiableList(converters); + } + private final WebOperation operation; ServletWebOperationAdapter(WebOperation operation) { @@ -350,12 +367,32 @@ private Object handleResult(Object result, HttpMethod httpMethod) { (httpMethod != HttpMethod.GET) ? HttpStatus.NO_CONTENT : HttpStatus.NOT_FOUND); } if (!(result instanceof WebEndpointResponse)) { - return result; + return convertIfNecessary(result); } WebEndpointResponse response = (WebEndpointResponse) result; MediaType contentType = (response.getContentType() != null) ? new MediaType(response.getContentType()) : null; - return ResponseEntity.status(response.getStatus()).contentType(contentType).body(response.getBody()); + return ResponseEntity.status(response.getStatus()).contentType(contentType) + .body(convertIfNecessary(response.getBody())); + } + + private Object convertIfNecessary(Object body) { + for (Function converter : BODY_CONVERTERS) { + body = converter.apply(body); + } + return body; + } + + private static class FluxBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (!(body instanceof Flux)) { + return body; + } + return ((Flux) body).collectList(); + } + } } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBeanTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBeanTests.java index bcebfa49a378..227229154c0d 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBeanTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/jmx/EndpointMBeanTests.java @@ -27,6 +27,7 @@ import javax.management.ReflectionException; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.FatalBeanException; @@ -155,6 +156,15 @@ void invokeWhenMonoResultShouldBlockOnMono() throws MBeanException, ReflectionEx assertThat(result).isEqualTo("monoResult"); } + @Test + void invokeWhenFluxResultShouldCollectToMonoListAndBlockOnMono() throws MBeanException, ReflectionException { + TestExposableJmxEndpoint endpoint = new TestExposableJmxEndpoint( + new TestJmxOperation((arguments) -> Flux.just("flux", "result"))); + EndpointMBean bean = new EndpointMBean(this.responseMapper, null, endpoint); + Object result = bean.invoke("testOperation", NO_PARAMS, NO_SIGNATURE); + assertThat(result).asList().containsExactly("flux", "result"); + } + @Test void invokeShouldCallResponseMapper() throws MBeanException, ReflectionException { TestJmxOperationResponseMapper responseMapper = spy(this.responseMapper); diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/web/annotation/AbstractWebEndpointIntegrationTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/web/annotation/AbstractWebEndpointIntegrationTests.java index 0f5851073a2f..488c1c514c3a 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/web/annotation/AbstractWebEndpointIntegrationTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/web/annotation/AbstractWebEndpointIntegrationTests.java @@ -28,6 +28,7 @@ import java.util.function.Supplier; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.boot.actuate.endpoint.SecurityContext; @@ -269,6 +270,14 @@ void readOperationWithMonoResponse() { .isOk().expectBody().jsonPath("a").isEqualTo("alpha")); } + @Test + void readOperationWithFluxResponse() { + load(FluxResponseEndpointConfiguration.class, + (client) -> client.get().uri("/flux").exchange().expectStatus().isOk().expectBody().jsonPath("[0].a") + .isEqualTo("alpha").jsonPath("[1].b").isEqualTo("bravo").jsonPath("[2].c") + .isEqualTo("charlie")); + } + @Test void readOperationWithCustomMediaType() { load(CustomMediaTypesEndpointConfiguration.class, (client) -> client.get().uri("/custommediatypes").exchange() @@ -564,6 +573,17 @@ MonoResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) { } + @Configuration(proxyBeanMethods = false) + @Import(BaseConfiguration.class) + static class FluxResponseEndpointConfiguration { + + @Bean + FluxResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) { + return new FluxResponseEndpoint(); + } + + } + @Configuration(proxyBeanMethods = false) @Import(BaseConfiguration.class) static class CustomMediaTypesEndpointConfiguration { @@ -806,6 +826,17 @@ Mono> operation() { } + @Endpoint(id = "flux") + static class FluxResponseEndpoint { + + @ReadOperation + Flux> operation() { + return Flux.just(Collections.singletonMap("a", "alpha"), Collections.singletonMap("b", "bravo"), + Collections.singletonMap("c", "charlie")); + } + + } + @Endpoint(id = "custommediatypes") static class CustomMediaTypesEndpoint {