Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re enable HttpMetricsHandlerTests in netty5 branch #2337

Merged
merged 18 commits into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -165,6 +166,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer) {
return ((Buffer) msg).readableBytes();
}
else if (msg instanceof HttpContent) {
return ((HttpContent) msg).payload().readableBytes();
}
return 0;
}

Expand Down
Expand Up @@ -82,6 +82,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Override
public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) {
Channel channel = ctx.channel();
boolean removeThisHandler = false;
if (evt == UPGRADE_ISSUED) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "An upgrade request was sent to the server."));
Expand All @@ -92,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) {
log.debug(format(channel, "The upgrade to H2C protocol was successful."));
}
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL);
ctx.pipeline().remove(this);
removeThisHandler = true; // we have to remove ourself from the pipleline after having fired the event below.
pderop marked this conversation as resolved.
Show resolved Hide resolved
}
else if (evt == UPGRADE_REJECTED) {
if (log.isDebugEnabled()) {
Expand All @@ -101,6 +102,9 @@ else if (evt == UPGRADE_REJECTED) {
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_REJECTED);
}
ctx.fireChannelInboundEvent(evt);
if (removeThisHandler) {
ctx.pipeline().remove(this);
}
}

void sendNewState(Connection connection, ConnectionObserver.State state) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -114,9 +115,12 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
// The listeners are now invoked asynchronously (see https://github.com/netty/netty/pull/9489),
// and it seems we need to first obtain the channelOps, which may not be present anymore
// when the listener will be invoked.
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
return ctx.write(msg)
.addListener(future -> {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations ops) {
try {
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
Expand Down Expand Up @@ -208,6 +212,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer) {
return ((Buffer) msg).readableBytes();
}
else if (msg instanceof HttpContent) {
return ((HttpContent) msg).payload().readableBytes();
}
return 0;
}

Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -93,7 +92,6 @@
/**
* @author Violeta Georgieva
*/
@Disabled
class HttpMetricsHandlerTests extends BaseHttpTest {
HttpServer httpServer;
private ConnectionProvider provider;
Expand Down Expand Up @@ -171,8 +169,9 @@ void tearDown() {
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception {
int expectedCloses = getExpectedCloses(negociatedProtocol);
pderop marked this conversation as resolved.
Show resolved Hide resolved
CountDownLatch latch1 = new CountDownLatch(expectedCloses);
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch1);
ConnectionObserver observerDisconnect = observeDisconnect(latchRef);

Expand Down Expand Up @@ -216,7 +215,7 @@ else if (clientProtocols.length == 2 &&
checkExpectationsExisting("/1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null,
numWrites[0], bytesWrite[0]);

CountDownLatch latch2 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
CountDownLatch latch2 = new CountDownLatch(expectedCloses);
latchRef.set(latch2);

StepVerifier.create(httpClient.post()
Expand All @@ -241,7 +240,8 @@ else if (clientProtocols.length == 2 &&
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@SuppressWarnings("unused") HttpProtocol negociatedProtocol) {
pderop marked this conversation as resolved.
Show resolved Hide resolved
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.metrics(true, id -> {
throw new IllegalArgumentException("Testcase injected Exception");
Expand All @@ -265,7 +265,8 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[]
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@SuppressWarnings("unused") HttpProtocol negociatedProtocol) {
pderop marked this conversation as resolved.
Show resolved Hide resolved
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.bindNow();

Expand All @@ -287,11 +288,10 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[]
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception {
pderop marked this conversation as resolved.
Show resolved Hide resolved
// For HTTP11, we expect to observe 2 DISCONNECTS for client, and 2 DISCONNECT for server.
// Else, we expect to observe 2 DISCONNECTS for client, and 1 DISCONNECT for server.
boolean isHTTP11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
int expectedDisconnects = isHTTP11 ? 4 : 3;
int expectedDisconnects = getExpectedCloses(negociatedProtocol);
pderop marked this conversation as resolved.
Show resolved Hide resolved

CountDownLatch latch = new CountDownLatch(expectedDisconnects);
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch);
Expand Down Expand Up @@ -369,8 +369,9 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol.
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception {
int expectedCloses = getExpectedCloses(negociatedProtocol);
pderop marked this conversation as resolved.
Show resolved Hide resolved
CountDownLatch latch1 = new CountDownLatch(expectedCloses);
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch1);
ConnectionObserver observerDisconnect = observeDisconnect(latchRef);

Expand Down Expand Up @@ -420,8 +421,9 @@ else if (clientProtocols.length == 2 &&
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception {
int expectedCloses = getExpectedCloses(negociatedProtocol);
pderop marked this conversation as resolved.
Show resolved Hide resolved
CountDownLatch latch1 = new CountDownLatch(expectedCloses);
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch1);
ConnectionObserver observerDisconnect = observeDisconnect(latchRef);

Expand Down Expand Up @@ -473,7 +475,7 @@ else if (clientProtocols.length == 2 &&
checkExpectationsExisting("testUriTagValueFunctionNotShared_1", sa.getHostString() + ":" + sa.getPort(),
1, serverCtx != null, numWrites[0], bytesWrite[0]);

CountDownLatch latch2 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
CountDownLatch latch2 = new CountDownLatch(expectedCloses);
latchRef.set(latch2);

httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_2")
Expand All @@ -499,7 +501,8 @@ else if (clientProtocols.length == 2 &&
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception {
pderop marked this conversation as resolved.
Show resolved Hide resolved
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow();

ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE;
Expand Down Expand Up @@ -530,7 +533,8 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception {
pderop marked this conversation as resolved.
Show resolved Hide resolved
ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE;
disposableServer =
customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder)
Expand Down Expand Up @@ -562,8 +566,9 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception {
int expectedCloses = getExpectedCloses(negociatedProtocol);
pderop marked this conversation as resolved.
Show resolved Hide resolved
CountDownLatch latch = new CountDownLatch(expectedCloses);
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch);
ConnectionObserver observerDisconnect = observeDisconnect(latchRef);

Expand Down Expand Up @@ -618,7 +623,8 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception {
pderop marked this conversation as resolved.
Show resolved Hide resolved
// Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after
// ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state
ServerRecorder.INSTANCE.reset();
Expand Down Expand Up @@ -905,6 +911,18 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag
}
}

/**
* Get number of disconnect events we expect to observe on a given connection.
* @param protocol the protocol used (for HTTP11, we expect to observe 4 disconnect events, and for other (H2/H2C), we expect 3 events)).
* @return number of disconnect events we expect to observe on a given connection
*/
int getExpectedCloses(HttpProtocol protocol) {
return switch (protocol) {
case H2, H2C -> 3;
case HTTP11 -> 4;
};
}

static Stream<Arguments> http11CompatibleProtocols() {
return Stream.of(
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),
Expand All @@ -918,23 +936,23 @@ static Stream<Arguments> http11CompatibleProtocols() {

static Stream<Arguments> httpCompatibleProtocols() {
return Stream.of(
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11),
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11},
Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)),
Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)),
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)),
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11)),
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)),
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null)
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null, HttpProtocol.H2C)
);
}

Expand Down