Skip to content

Commit

Permalink
Re enable HttpMetricsHandlerTests in netty5 branch (#2337)
Browse files Browse the repository at this point in the history
Related to #1873

This PR is an attempt to bring back the HttpMetricsHandlerTests into netty5 branch.
What has been done:

- in AbstractHttpClientMetricsHandler.java: it seems that now the processed data must also be extracted from the HttpContent parameter that can be passed as argument.
- Same thing done for AbstractHttpServerMetricsHandler.java: the HttpContent must be checked from the extractProcessedDataFromBuffer method.
- in HttpTrafficHandler.java, it seems that the handler can't fire an event after having removed itself from the pipeline, so the removal from the pipeline is now done after the event is fired.
- In AbstractHttpServerMetricsHandler.write method, sometimes the ChannelOperations seems to not be available anymore during listener invocation. This is probably caused by this Always notify FutureListener via the EventExecutor netty/netty#9489, where listeners are now invoked asynchronously. So the ChannelOperations is retrieved and cached before the listener is added.
- HttpMetricsHandlerTests.java: the test has been refactored in order to always expect to observe 4 disconnects when H1 is used, and three disconnects when H2 or H2C is used.
- The test is now rescheduling tasks in the event loops in order to ensure that meters are fully updated (remember that the meters are updated from listeners which are unfortunately rescheduled in event loops queue).
  • Loading branch information
pderop committed Jun 30, 2022
1 parent a45b247 commit 41a33ab
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 49 deletions.
Expand Up @@ -20,6 +20,7 @@
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -165,6 +166,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer buffer) {
return buffer.readableBytes();
}
else if (msg instanceof HttpContent<?> httpContent) {
return httpContent.payload().readableBytes();
}
return 0;
}

Expand Down
Expand Up @@ -82,6 +82,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Override
public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) {
Channel channel = ctx.channel();
boolean removeThisHandler = false;
if (evt == UPGRADE_ISSUED) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "An upgrade request was sent to the server."));
Expand All @@ -92,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) {
log.debug(format(channel, "The upgrade to H2C protocol was successful."));
}
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL);
ctx.pipeline().remove(this);
removeThisHandler = true; // we have to remove ourselves from the pipeline after having fired the event below.
}
else if (evt == UPGRADE_REJECTED) {
if (log.isDebugEnabled()) {
Expand All @@ -101,6 +102,9 @@ else if (evt == UPGRADE_REJECTED) {
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_REJECTED);
}
ctx.fireChannelInboundEvent(evt);
if (removeThisHandler) {
ctx.pipeline().remove(this);
}
}

void sendNewState(Connection connection, ConnectionObserver.State state) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -114,9 +115,12 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
// The listeners are now invoked asynchronously (see https://github.com/netty/netty/pull/9489),
// and it seems we need to first obtain the channelOps, which may not be present anymore
// when the listener will be invoked.
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
return ctx.write(msg)
.addListener(future -> {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations ops) {
try {
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
Expand Down Expand Up @@ -208,6 +212,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer buffer) {
return buffer.readableBytes();
}
else if (msg instanceof HttpContent<?> httpContent) {
return httpContent.payload().readableBytes();
}
return 0;
}

Expand Down

0 comments on commit 41a33ab

Please sign in to comment.