Skip to content

Commit

Permalink
WIP debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Sep 16, 2022
1 parent ade9961 commit 0f42407
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 15 deletions.
14 changes: 13 additions & 1 deletion core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import static java.lang.Math.max;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -669,7 +671,17 @@ private void runInternal() {
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
Status.CANCELLED
.withCause(t)
.withDescription(
"Failed to read message: "
+ t
+ ".\n\n***** Exception Trace: *****\n"
+ Throwables.getStackTraceAsString(t)
+ "***** END Exception Trace *****\n\n"
+ "***** Thread Trace: *****\n"
+ Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ "\n***** END Thread Trace: *****\n\n"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ protected GrpcHttp2ConnectionHandler(
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
setCumulator(ADAPTIVE_CUMULATOR);
}

/**
Expand Down
95 changes: 84 additions & 11 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import java.util.logging.Logger;

class NettyAdaptiveCumulator implements io.netty.handler.codec.ByteToMessageDecoder.Cumulator {
private static final Logger logger = Logger.getLogger(NettyAdaptiveCumulator.class.getName());
private final int composeMinSize;

NettyAdaptiveCumulator(int composeMinSize) {
Expand Down Expand Up @@ -133,17 +137,30 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
int newBytes = in.readableBytes();
int tailIndex = composite.numComponents() - 1;
int tailStart = composite.toByteIndex(tailIndex);
int tailBytes = composite.capacity() - tailStart;
int tailBytes = composite.writerIndex() - tailStart;
int totalBytes = newBytes + tailBytes;

ByteBuf tail = composite.component(tailIndex);
ByteBuf merged = null;

try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && totalBytes <= tail.maxCapacity()) {
System.err.println("\n\n==================================================================");
printBufDebug("INPUT", in);
printBufDebug("COMPOSITE", composite);
printBufDebug("component()", composite.component(tailIndex));

ByteBuf ic = composite.internalComponent(tailIndex);
printBufDebug("internalComponent()", "IC", ic);
printBufDebug("IC.duplicate()", "IC_DUP", ic.duplicate());
ByteBuf icr = ic.retainedDuplicate();
printBufDebug("IC.retainedDuplicate()", "IC_RET_DUP", icr);
icr.release();

// Ideal case: the tail isn't shared, and can be expanded to the required capacity.
// Take ownership of the tail.
merged = tail.retain();

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -157,24 +174,52 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
* as pronounced because the capacity is doubled with each reallocation.
*/
merged.writeBytes(in);
printBufDebug("MERGED", merged);

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex);
composite.setIndex(0, tailStart);
composite.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

printBufDebug("COMPOSITE RESULT", composite);
System.err.println("==================================================================\n\n");
} else {
System.err.println("\n\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
printBufDebug("INPUT", in);
printBufDebug("COMPOSITE", composite);

// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
merged = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE));
merged.setBytes(0, composite, tailStart, tailBytes)
.setBytes(tailBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
printBufDebug("NEW MERGED", merged);

in.readerIndex(in.writerIndex());

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex);
composite.setIndex(0, tailStart);
composite.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

printBufDebug("COMPOSITE RESULT", composite);
System.err.println("\n\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
}
// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex).setIndex(0, tailStart)
.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

} finally {
// Input buffer was merged with the tail.
if (in != null) {
Expand All @@ -186,4 +231,32 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
}
}
}

private static void printBufDebug(String title, ByteBuf buf) {
printBufDebug(title, title, buf);
}

private static void printBufDebug(String title, String prefix, ByteBuf buf) {
String msg = "$$$$$$$$$$$$$$$$$$$$ " + title + " $$$$$$$$$$$$$$$$$$$$";
int len = msg.length();
msg += "\n";
msg += ByteBufUtil.prettyHexDump(buf, 0, buf.readerIndex() + buf.readableBytes()) + "\n";
msg += prefix + " " + buf + "\n";
msg += prefix + " " + getIndexes(buf) + "\n";
msg += prefix + " " + buf.getClass().getName() + "\n";
if (buf instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) buf;
int tailIndex = composite.numComponents() - 1;
msg += composite.componentTypes(prefix + " tail component " + tailIndex, tailIndex);
} else {
msg += CompositeByteBuf.getBufTypes(prefix, buf);
}
msg += Strings.repeat("$", len) + "\n";
logger.info(msg);
}

private static String getIndexes(ByteBuf buf) {
return "rix=0x" + Integer.toHexString(buf.readerIndex())
+ " wix=0x" + Integer.toHexString(buf.writerIndex());
}
}
11 changes: 10 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ private NettyClientHandler(
this.authority = authority;
this.attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
setCumulator(ADAPTIVE_CUMULATOR);
// setCumulator(COMPOSITE_CUMULATOR);
// setCumulator(MERGE_CUMULATOR);

// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
Expand Down Expand Up @@ -497,6 +500,12 @@ InternalChannelz.Security getSecurityInfo() {
protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
Http2Exception http2Ex) {
logger.log(Level.FINE, "Caught a connection error", cause);
if (http2Ex != null) {
logger.log(Level.WARNING, "#### http2 ex: " + http2Ex + " err:" + http2Ex.error() + " shut:"
+ http2Ex.shutdownHint(), cause);
} else {
logger.log(Level.WARNING, "#### http2 ex: null", cause);
}
lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
// Parent class will shut down the Channel
super.onConnectionError(ctx, outbound, cause, http2Ex);
Expand Down Expand Up @@ -875,7 +884,7 @@ private Status statusFromH2Error(
if (statusCode == null) {
statusCode = status.getCode();
}
String debugString = "";
String debugString = "statusFromH2Error() debugString: ";
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
debugString = ", debug data: " + new String(debugData, UTF_8);
Expand Down
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ private NettyServerHandler(
Attributes eagAttributes) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
// setCumulator(ADAPTIVE_CUMULATOR);

final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
Expand Down
14 changes: 13 additions & 1 deletion netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static io.netty.util.CharsetUtil.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.grpc.InternalChannelz;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
Expand Down Expand Up @@ -286,7 +288,17 @@ public static Status statusFromThrowable(Throwable t) {
return Status.UNAVAILABLE.withDescription("unresolved address").withCause(t);
}
if (t instanceof Http2Exception) {
return Status.INTERNAL.withDescription("http2 exception").withCause(t);
return Status.INTERNAL
.withDescription(
"http2 exception: "
+ t
+ ".\n\n***** Exception Trace: *****\n"
+ Throwables.getStackTraceAsString(t)
+ "***** END Exception Trace *****\n\n"
+ "***** Thread Trace: *****\n"
+ Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ "\n***** END Thread Trace: *****\n\n")
.withCause(t);
}
return s;
}
Expand Down

0 comments on commit 0f42407

Please sign in to comment.