Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom tags for MVC metrics #572

Merged
merged 12 commits into from Mar 1, 2021
2 changes: 1 addition & 1 deletion server/build.gradle
Expand Up @@ -79,7 +79,7 @@ configure(javaProjects) {
dependency("com.fasterxml.jackson.core:jackson-core:2.11.0")
dependency("com.fasterxml.jackson.core:jackson-annotations:2.11.0")

dependency("io.micrometer:micrometer-registry-prometheus:1.6.2")
dependency("io.micrometer:micrometer-registry-prometheus:1.6.4")

dependency("joda-time:joda-time:2.8.1")

Expand Down
1 change: 1 addition & 0 deletions server/pxf-service/build.gradle
Expand Up @@ -63,6 +63,7 @@ dependencies {

developmentOnly 'org.springframework.boot:spring-boot-devtools'
testImplementation('org.springframework.boot:spring-boot-starter-test')
testImplementation('org.springframework.boot:spring-boot-starter-webflux')
}

bootJar {
Expand Down
@@ -1,10 +1,14 @@
package org.greenplum.pxf.service;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.apache.commons.lang.StringUtils;
import org.greenplum.pxf.api.configuration.PxfServerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.actuate.metrics.web.servlet.WebMvcTagsContributor;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.task.TaskExecutorBuilder;
Expand All @@ -19,6 +23,10 @@
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;

/**
* Configures the {@link AsyncTaskExecutor} for tasks that will stream data to
* clients
Expand All @@ -27,12 +35,11 @@
@EnableConfigurationProperties(PxfServerProperties.class)
public class PxfConfiguration implements WebMvcConfigurer {

private static final Logger LOG = LoggerFactory.getLogger(PxfConfiguration.class);

/**
* Bean name of PXF's {@link TaskExecutor}.
*/
public static final String PXF_RESPONSE_STREAM_TASK_EXECUTOR = "pxfResponseStreamTaskExecutor";
private static final Logger LOG = LoggerFactory.getLogger(PxfConfiguration.class);

private final ListableBeanFactory beanFactory;

Expand Down Expand Up @@ -100,4 +107,45 @@ public ThreadPoolTaskExecutor pxfApplicationTaskExecutor(PxfServerProperties pxf

return builder.build(PxfThreadPoolTaskExecutor.class);
}

/**
* Custom {@link WebMvcTagsContributor} that adds PXF specific tags to metrics for Spring MVC (REST endpoints)
*
* @return the {@link WebMvcTagsContributor} instance
*/
@Bean
public WebMvcTagsContributor webMvcTagsContributor() {
return new WebMvcTagsContributor() {

private static final String UNKNOWN_VALUE = "unknown";

@Override
public Iterable<Tag> getTags(HttpServletRequest request, HttpServletResponse response, Object handler, Throwable exception) {
// default server tag value to "default" if the request is from a PXF Client
// if request is not from PXF client, apply the same tags wth the value "unknown"
// because the Prometheus Metrics Registry requires a metric to have a consistent set of tags
String defaultServer = StringUtils.isNotBlank(request.getHeader("X-GP-USER")) ? "default" : UNKNOWN_VALUE;
Tags tags = Tags.empty();
tags = addTag("user", request.getHeader("X-GP-USER"), tags, UNKNOWN_VALUE);
tags = addTag("segment", request.getHeader("X-GP-SEGMENT-ID"), tags, UNKNOWN_VALUE);
tags = addTag("profile", request.getHeader("X-GP-OPTIONS-PROFILE"), tags, UNKNOWN_VALUE);
tags = addTag("server", request.getHeader("X-GP-OPTIONS-SERVER"), tags, defaultServer);
return tags;
}

@Override
public Iterable<Tag> getLongRequestTags(HttpServletRequest request, Object handler) {
return new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you try with long requests tags ? maybe there is no issue if we mark our controller as long request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think our requests are long requests, and we should probably mark them as such, and we should probably implement this method instead. Both read and write are considered long requests according to the documentation. All other requests (actuator, health) won't have the headers anyway, so nothing to add there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't yet understand the difference, I can't believe the only difference would be some arbitrary duration boundary, need more research there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we return Tags.empty() here, if we are not going to use it?

}

private Tags addTag(String tag, String value, Tags tags, String defaultValue) {
value = StringUtils.defaultIfBlank(value, defaultValue);
if (StringUtils.isNotBlank(value)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is no longer necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes, thank you for the catch, now I can remove the private method !

tags = tags.and(tag, value);
}
return tags;
}
};
}

}
4 changes: 2 additions & 2 deletions server/pxf-service/src/main/resources/application.properties
Expand Up @@ -41,5 +41,5 @@ pxf.task.pool.max-size=${pxf.max.threads:200}
pxf.task.pool.queue-capacity=0

# logging
logging.file.name=${pxf.logdir:/var/log}/pxf-service.log
logging.file.path=${pxf.logdir:/var/log}
logging.file.name=${pxf.logdir:/tmp}/pxf-service.log
logging.file.path=${pxf.logdir:/tmp}
@@ -1,4 +1,92 @@
package org.greenplum.pxf.service;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.actuate.metrics.web.servlet.WebMvcTagsContributor;

import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

@ExtendWith(MockitoExtension.class)
class PxfConfigurationTest {

private PxfConfiguration configuration;
private WebMvcTagsContributor contributor;
private HttpServletRequest mockRequest;

@BeforeEach
public void setup() {
configuration = new PxfConfiguration(null);
contributor = configuration.webMvcTagsContributor();
mockRequest = mock(HttpServletRequest.class, withSettings().lenient());
}

@Test
public void testPxfWebMvcTagsContributor_pxfEndpoint_namedServer() {
when(mockRequest.getHeader("X-GP-USER")).thenReturn("Alex");
when(mockRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("5");
when(mockRequest.getHeader("X-GP-OPTIONS-PROFILE")).thenReturn("test:text");
when(mockRequest.getHeader("X-GP-OPTIONS-SERVER")).thenReturn("test_server");

List<Tag> expectedTags = Tags.of("user", "Alex")
.and("segment", "5")
.and("profile", "test:text")
.and("server", "test_server")
.stream().collect(Collectors.toList());

Iterable<Tag> tagsIterable = contributor.getTags(mockRequest, null, null, null);
List<Tag> tags = StreamSupport.stream(tagsIterable.spliterator(), false).collect(Collectors.toList());

assertTrue(CollectionUtils.isEqualCollection(expectedTags, tags));
assertFalse(contributor.getLongRequestTags(mockRequest, null).iterator().hasNext());
}

@Test
public void testPxfWebMvcTagsContributor_pxfEndpoint_defaultServer() {
when(mockRequest.getHeader("X-GP-USER")).thenReturn("Alex");
when(mockRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("5");
when(mockRequest.getHeader("X-GP-OPTIONS-PROFILE")).thenReturn("test:text");
when(mockRequest.getHeader("X-GP-OPTIONS-SERVER")).thenReturn(null);

List<Tag> expectedTags = Tags.of("user", "Alex")
.and("segment", "5")
.and("profile", "test:text")
.and("server", "default")
.stream().collect(Collectors.toList());

Iterable<Tag> tagsIterable = contributor.getTags(mockRequest, null, null, null);
List<Tag> tags = StreamSupport.stream(tagsIterable.spliterator(), false).collect(Collectors.toList());

assertTrue(CollectionUtils.isEqualCollection(expectedTags, tags));
assertFalse(contributor.getLongRequestTags(mockRequest, null).iterator().hasNext());
}

@Test
public void testPxfWebMvcTagsContributor_nonPxfEndpoint() {
List<Tag> expectedTags = Tags.of("user", "unknown")
.and("segment", "unknown")
.and("profile", "unknown")
.and("server", "unknown")
.stream().collect(Collectors.toList());

Iterable<Tag> tagsIterable = contributor.getTags(mockRequest, null, null, null);
List<Tag> tags = StreamSupport.stream(tagsIterable.spliterator(), false).collect(Collectors.toList());

assertTrue(CollectionUtils.isEqualCollection(expectedTags, tags));
assertFalse(contributor.getLongRequestTags(mockRequest, null).iterator().hasNext());
}

}
@@ -0,0 +1,139 @@
package org.greenplum.pxf.service;

import com.google.common.base.Charsets;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.service.controller.ReadService;
import org.greenplum.pxf.service.controller.WriteService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.stubbing.Answer;
import org.springframework.boot.test.autoconfigure.actuate.metrics.AutoConfigureMetrics;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;

import java.io.IOException;
import java.io.OutputStream;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = PxfServiceApplication.class)
@AutoConfigureMetrics
public class PxfMetricsIT {

@LocalServerPort
private int port;

@MockBean
private RequestParser<MultiValueMap<String, String>> mockParser;

@MockBean
private ReadService readService;

@MockBean
private WriteService mockWriteService;

@Mock
private RequestContext mockContext;

private WebTestClient client;

@BeforeEach
public void setUp() {
client = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
}

@Test
public void test_HttpServerRequests_Metric() throws Exception {
mockServices();
// call PXF read API
client.get().uri("/pxf/read")
.header("X-GP-USER", "reader")
.header("X-GP-SEGMENT-ID", "77")
.header("X-GP-OPTIONS-PROFILE", "profile:test")
.header("X-GP-OPTIONS-SERVER", "speedy")
.exchange().expectStatus().isOk()
.expectBody(String.class).isEqualTo("Hello from read!");

// assert metric got reported with proper tags
client.get().uri("/actuator/metrics/http.server.requests?tag=uri:/pxf/read")
.exchange().expectStatus().isOk().expectBody()
.jsonPath("$.measurements[?(@.statistic == 'COUNT')].value").isEqualTo(1.0)
.jsonPath("$.measurements[?(@.statistic == 'TOTAL_TIME' && @.value < 0.5)]").doesNotHaveJsonPath()
.jsonPath("$.availableTags[?(@.tag == 'application')].values[0]").isEqualTo("pxf-service")
.jsonPath("$.availableTags[?(@.tag == 'user')].values[0]").isEqualTo("reader")
.jsonPath("$.availableTags[?(@.tag == 'segment')].values[0]").isEqualTo("77")
.jsonPath("$.availableTags[?(@.tag == 'profile')].values[0]").isEqualTo("profile:test")
.jsonPath("$.availableTags[?(@.tag == 'server')].values[0]").isEqualTo("speedy");

// call PXF write API
client.post().uri("/pxf/write")
.header("X-GP-USER", "writer")
.header("X-GP-SEGMENT-ID", "77")
.header("X-GP-OPTIONS-PROFILE", "profile:test")
.header("X-GP-OPTIONS-SERVER", "speedy")
.exchange().expectStatus().isOk()
.expectBody(String.class).isEqualTo("Hello from write!");

// assert metric got reported with proper tags
client.get().uri("/actuator/metrics/http.server.requests?tag=uri:/pxf/write")
.exchange().expectStatus().isOk().expectBody()
.jsonPath("$.measurements[?(@.statistic == 'COUNT')].value").isEqualTo(1.0)
.jsonPath("$.availableTags[?(@.tag == 'application')].values[0]").isEqualTo("pxf-service")
.jsonPath("$.availableTags[?(@.tag == 'user')].values[0]").isEqualTo("writer")
.jsonPath("$.availableTags[?(@.tag == 'segment')].values[0]").isEqualTo("77")
.jsonPath("$.availableTags[?(@.tag == 'profile')].values[0]").isEqualTo("profile:test")
.jsonPath("$.availableTags[?(@.tag == 'server')].values[0]").isEqualTo("speedy");

// assert metric for segment access is aggregate
client.get().uri("/actuator/metrics/http.server.requests?tag=segment:77")
.exchange().expectStatus().isOk().expectBody()
.jsonPath("$.measurements[?(@.statistic == 'COUNT')].value").isEqualTo(2.0)
.jsonPath("$.availableTags[?(@.tag == 'application')].values[0]").isEqualTo("pxf-service")
.jsonPath("$.availableTags[?(@.tag == 'user')].values[0]").isEqualTo("reader")
.jsonPath("$.availableTags[?(@.tag == 'user')].values[1]").isEqualTo("writer")
.jsonPath("$.availableTags[?(@.tag == 'profile')].values[0]").isEqualTo("profile:test")
.jsonPath("$.availableTags[?(@.tag == 'server')].values[0]").isEqualTo("speedy");

// hit the actuator health endpoint
client.get().uri("/actuator/health")
.exchange().expectStatus().isOk().expectBody()
.json("{\"status\":\"UP\",\"groups\":[\"liveness\",\"readiness\"]}");

// assert prometheus endpoint reflects the metric as well
String prometheusResponse = client.get().uri("/actuator/prometheus")
.exchange()
.expectStatus().isOk()
.expectBody(String.class).returnResult().getResponseBody();
assertNotNull(prometheusResponse);
assertTrue(prometheusResponse.contains("http_server_requests_seconds_count{application=\"pxf-service\",exception=\"None\",method=\"GET\",outcome=\"SUCCESS\",profile=\"profile:test\",segment=\"77\",server=\"speedy\",status=\"200\",uri=\"/pxf/read\",user=\"reader\",} 1.0\n"));
assertTrue(prometheusResponse.contains("http_server_requests_seconds_count{application=\"pxf-service\",exception=\"None\",method=\"POST\",outcome=\"SUCCESS\",profile=\"profile:test\",segment=\"77\",server=\"speedy\",status=\"200\",uri=\"/pxf/write\",user=\"writer\",} 1.0\n"));
assertTrue(prometheusResponse.contains("http_server_requests_seconds_count{application=\"pxf-service\",exception=\"None\",method=\"GET\",outcome=\"SUCCESS\",profile=\"unknown\",segment=\"unknown\",server=\"unknown\",status=\"200\",uri=\"/actuator/health\",user=\"unknown\",} 1.0\n"));
}

private void mockServices() throws IOException {
// mock ReadService
when(mockParser.parseRequest(any(), eq(RequestContext.RequestType.READ_BRIDGE))).thenReturn(mockContext);
Answer<Void> readAnswer = invocation -> {
// sleep to simulate time it takes to execute, check that reported metric takes into account async time
Thread.sleep(500);
invocation.getArgument(1, OutputStream.class).write("Hello from read!".getBytes(Charsets.UTF_8));
return null;
};
doAnswer(readAnswer).when(readService).readData(any(), any());

// mock WriteService
when(mockParser.parseRequest(any(), eq(RequestContext.RequestType.WRITE_BRIDGE))).thenReturn(mockContext);
when(mockWriteService.writeData(same(mockContext), any())).thenReturn("Hello from write!");
}

}
Expand Up @@ -6,7 +6,6 @@
import org.greenplum.pxf.service.RequestParser;
import org.greenplum.pxf.service.controller.ReadService;
import org.greenplum.pxf.service.controller.WriteService;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -51,11 +50,6 @@ public class PxfResourceIT {
@Mock
private RequestContext mockContext;

@BeforeAll
public static void init() {
System.setProperty("pxf.logdir", "/tmp");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay! did you want to default pxf.logdir to /tmp as part of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only defaulted for my tests, I can rebase and remove this altogether and default to /tmp in application.properties

}

@Test
public void testReadEndpoint() throws Exception {
when(mockParser.parseRequest(any(), eq(RequestContext.RequestType.READ_BRIDGE))).thenReturn(mockContext);
Expand Down