Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re enable HttpMetricsHandlerTests in netty5 branch #2337

Merged
merged 18 commits into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -165,6 +166,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer buffer) {
return buffer.readableBytes();
}
else if (msg instanceof HttpContent<?> httpContent) {
return httpContent.payload().readableBytes();
}
return 0;
}

Expand Down
Expand Up @@ -82,6 +82,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Override
public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) {
Channel channel = ctx.channel();
boolean removeThisHandler = false;
if (evt == UPGRADE_ISSUED) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "An upgrade request was sent to the server."));
Expand All @@ -92,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) {
log.debug(format(channel, "The upgrade to H2C protocol was successful."));
}
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL);
ctx.pipeline().remove(this);
removeThisHandler = true; // we have to remove ourselves from the pipeline after having fired the event below.
}
else if (evt == UPGRADE_REJECTED) {
if (log.isDebugEnabled()) {
Expand All @@ -101,6 +102,9 @@ else if (evt == UPGRADE_REJECTED) {
sendNewState(Connection.from(channel), HttpClientState.UPGRADE_REJECTED);
}
ctx.fireChannelInboundEvent(evt);
if (removeThisHandler) {
ctx.pipeline().remove(this);
}
}

void sendNewState(Connection connection, ConnectionObserver.State state) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -114,9 +115,12 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
// The listeners are now invoked asynchronously (see https://github.com/netty/netty/pull/9489),
// and it seems we need to first obtain the channelOps, which may not be present anymore
// when the listener will be invoked.
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
return ctx.write(msg)
.addListener(future -> {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations ops) {
try {
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
Expand Down Expand Up @@ -208,6 +212,9 @@ private long extractProcessedDataFromBuffer(Object msg) {
else if (msg instanceof Buffer buffer) {
return buffer.readableBytes();
}
else if (msg instanceof HttpContent<?> httpContent) {
return httpContent.payload().readableBytes();
}
return 0;
}

Expand Down