Skip to content

Commit

Permalink
Merge #2237 into netty5
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 24, 2022
2 parents 5197024 + 12544db commit c8cf6b9
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 121 deletions.
Expand Up @@ -22,6 +22,8 @@
import io.netty5.channel.socket.DatagramPacket;
import io.netty5.util.concurrent.Future;
import reactor.netty.NettyPipeline;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
Expand All @@ -34,6 +36,8 @@
*/
public abstract class AbstractChannelMetricsHandler extends ChannelHandlerAdapter {

private static final Logger log = Loggers.getLogger(AbstractChannelMetricsHandler.class);

final SocketAddress remoteAddress;

final boolean onServer;
Expand All @@ -46,15 +50,27 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
try {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
try {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
}
Expand All @@ -79,41 +95,59 @@ public void channelRegistered(ChannelHandlerContext ctx) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Buffer buffer) {
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof Buffer buffer) {
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket p) {
Buffer buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
else if (msg instanceof DatagramPacket p) {
Buffer buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
}

@Override
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Buffer buffer) {
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof Buffer buffer) {
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket p) {
Buffer buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
else if (msg instanceof DatagramPacket p) {
Buffer buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

return ctx.write(msg);
}

@Override
public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
try {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelExceptionCaught(cause);
}
Expand Down
Expand Up @@ -22,6 +22,8 @@
import io.netty5.util.concurrent.Promise;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.internal.util.MapUtils;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.net.SocketAddress;
import java.time.Duration;
Expand All @@ -39,6 +41,8 @@
*/
class AddressResolverGroupMetrics<T extends SocketAddress> extends AddressResolverGroup<T> {

private static final Logger log = Loggers.getLogger(AddressResolverGroupMetrics.class);

static final ConcurrentMap<Integer, AddressResolverGroupMetrics<?>> cache = new ConcurrentHashMap<>();

static AddressResolverGroupMetrics<?> getOrCreate(
Expand Down Expand Up @@ -125,10 +129,16 @@ Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T
}

void record(long resolveTimeStart, String status, SocketAddress remoteAddress) {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
try {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
}
}
Expand Up @@ -407,15 +407,21 @@ protected void initChannel(Channel channel) {
ChannelOperations.addMetricsHandler(channel, config.metricsRecorder, remoteAddress, onServer);

if (Metrics.isInstrumentationAvailable()) {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
try {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.executor());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.executor());
}
}

Expand Down
Expand Up @@ -16,8 +16,10 @@
package reactor.netty.transport;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty5.channel.EventLoop;
import io.netty5.util.concurrent.SingleThreadEventExecutor;
Expand Down Expand Up @@ -124,6 +126,52 @@ void testEventLoopMetrics() throws InterruptedException {
}
}

// https://github.com/reactor/reactor-netty/issues/2187
@Test
void testEventLoopMetricsFailure() throws InterruptedException {
registry.config().meterFilter(new MeterFilter() {
@Override
public Meter.Id map(Meter.Id id) {
throw new IllegalArgumentException("Test injected Exception");
}
});

final CountDownLatch latch = new CountDownLatch(1);
DisposableServer server = null;
Connection client = null;
LoopResources loop = null;

try {
loop = LoopResources.create(TransportEventLoopMetricsTest.class.getName(), 3, true);
server = TcpServer.create()
.port(0)
.metrics(true)
.runOn(loop)
.doOnConnection(c -> latch.countDown())
.bindNow();

assertThat(server).isNotNull();
client = TcpClient.create()
.port(server.port())
.connectNow();

assertThat(client).isNotNull();
assertThat(latch.await(5, TimeUnit.SECONDS)).as("Failed to connect").isTrue();
}

finally {
if (client != null) {
client.disposeNow();
}
if (server != null) {
server.disposeNow();
}
if (loop != null) {
loop.disposeLater().block(Duration.ofSeconds(10));
}
}
}

private double getGaugeValue(String name, String... tags) {
Gauge gauge = registry.find(name).tags(tags).gauge();
double result = -1;
Expand Down

0 comments on commit c8cf6b9

Please sign in to comment.