Skip to content

Commit

Permalink
WIP debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Sep 20, 2022
1 parent ade9961 commit 14aadb4
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 15 deletions.
14 changes: 13 additions & 1 deletion core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import static java.lang.Math.max;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -669,7 +671,17 @@ private void runInternal() {
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
Status.CANCELLED
.withCause(t)
.withDescription(
"Failed to read message: "
+ t
+ ".\n\n***** Exception Trace: *****\n"
+ Throwables.getStackTraceAsString(t)
+ "***** END Exception Trace *****\n\n"
+ "***** Thread Trace: *****\n"
+ Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ "\n***** END Thread Trace: *****\n\n"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ protected GrpcHttp2ConnectionHandler(
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
setCumulator(ADAPTIVE_CUMULATOR);
}

/**
Expand Down
115 changes: 104 additions & 11 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import java.util.logging.Logger;

class NettyAdaptiveCumulator implements io.netty.handler.codec.ByteToMessageDecoder.Cumulator {
private static final Logger logger = Logger.getLogger(NettyAdaptiveCumulator.class.getName());
private final int composeMinSize;

NettyAdaptiveCumulator(int composeMinSize) {
Expand Down Expand Up @@ -60,6 +64,9 @@ class NettyAdaptiveCumulator implements io.netty.handler.codec.ByteToMessageDeco
public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable()) {
cumulation.release();
logger.info("-------------------------- Return As Is --------------------------");
printBufDebug("INPUT AS IS", in);
logger.info("------------------------------------------------------------------");
return in;
}
CompositeByteBuf composite = null;
Expand All @@ -69,11 +76,29 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
// Writer index must equal capacity if we are going to "write"
// new components to the end
if (composite.writerIndex() != composite.capacity()) {
logger.info("-------------------------- Adjust capacity --------------------------");
// printBufDebug("COMPOSITE ADJUST: INPUT", in);
printBufDebug("COMPOSITE ADJUST: PRE", composite);
composite.capacity(composite.writerIndex());
printBufDebug("COMPOSITE ADJUST: RESULT", composite);
logger.info("----------------------------------------------------------------------");
}
} else {
logger.info("-------------------------- Allocate new composite --------------------------");
// printBufDebug("ALLOC NEW: INPUT", in);
logger.warning("************ instanceof: " + (cumulation instanceof CompositeByteBuf) + "************");
logger.warning("************ refCnt: " + (cumulation.refCnt()) + "************");
logger.warning("************ instanceof && refCnt: "
+ (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) + " ************");
logger.warning("************ (instanceof) && refCnt: "
+ ((cumulation instanceof CompositeByteBuf) && cumulation.refCnt() == 1) + " ************");
logger.warning("************ (instanceof) && (refCnt): "
+ ((cumulation instanceof CompositeByteBuf) && (cumulation.refCnt() == 1)) + " ************");
printBufDebug("ALLOC NEW COMPOSITE: CUMULATION", cumulation);
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
.addFlattenedComponents(true, cumulation);
printBufDebug("ALLOC NEW COMPOSITE: RESULT", composite);
logger.info("----------------------------------------------------------------------------");
}
addInput(alloc, composite, in);
in = null;
Expand All @@ -93,7 +118,11 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
@VisibleForTesting
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
if (shouldCompose(composite, in, composeMinSize)) {
logger.info("+++++++++++++++++++++ Composing new component +++++++++++++++++++++");
printBufDebug("COMPOSITE ADD COMPONENT: INPUT", in);
composite.addFlattenedComponents(true, in);
printBufDebug("COMPOSITE ADD COMPONENT: RESULT", composite);
logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
} else {
// The total size of the new data and the last component are below the threshold. Merge them.
mergeWithCompositeTail(alloc, composite, in);
Expand Down Expand Up @@ -133,17 +162,27 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
int newBytes = in.readableBytes();
int tailIndex = composite.numComponents() - 1;
int tailStart = composite.toByteIndex(tailIndex);
int tailBytes = composite.capacity() - tailStart;
int tailBytes = composite.writerIndex() - tailStart;
int totalBytes = newBytes + tailBytes;

ByteBuf tail = composite.component(tailIndex);
ByteBuf merged = null;

try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && totalBytes <= tail.maxCapacity()) {
logger.info("========================= Extending Tail =========================");
printBufDebug("INPUT", in);
printBufDebug("COMPOSITE", composite);
printBufDebug("component()", composite.component(tailIndex));

ByteBuf ic = composite.internalComponent(tailIndex);
printBufDebug("internalComponent()", "IC", ic);
printBufDebug("IC.duplicate()", "IC_DUP", ic.duplicate());

// Ideal case: the tail isn't shared, and can be expanded to the required capacity.
// Take ownership of the tail.
merged = tail.retain();

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -157,24 +196,52 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
* as pronounced because the capacity is doubled with each reallocation.
*/
merged.writeBytes(in);
printBufDebug("MERGED", merged);

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex);
composite.setIndex(0, tailStart);
composite.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

printBufDebug("COMPOSITE MERGE RESULT", composite);
logger.info("==================================================================\n\n");
} else {
logger.info("^^^^^^^^^^^^^^^^^^^^^^^^^^ Reallocating new tail ^^^^^^^^^^^^^^^^^^^^^^^^^^^");
printBufDebug("INPUT", in);
printBufDebug("COMPOSITE", composite);

// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
merged = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE));
merged.setBytes(0, composite, tailStart, tailBytes)
.setBytes(tailBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
printBufDebug("NEW MERGED", merged);

in.readerIndex(in.writerIndex());

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex);
composite.setIndex(0, tailStart);
composite.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

printBufDebug("COMPOSITE REALOC RESULT", composite);
logger.info("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
}
// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the tail, reset writer index, add merged component.
composite.removeComponent(tailIndex).setIndex(0, tailStart)
.addFlattenedComponents(true, merged);
merged = null;
in.release();
in = null;
// Restore the reader.
composite.readerIndex(prevReader);

} finally {
// Input buffer was merged with the tail.
if (in != null) {
Expand All @@ -186,4 +253,30 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp
}
}
}

private static void printBufDebug(String title, ByteBuf buf) {
printBufDebug(title, title, buf);
}

private static void printBufDebug(String title, String prefix, ByteBuf buf) {
String msg = "$$$$$$$$$$$$$$$$$$$$ " + title + " $$$$$$$$$$$$$$$$$$$$";
int len = msg.length();
msg += "\n";
msg += ByteBufUtil.prettyHexDump(buf, 0, buf.readerIndex() + buf.readableBytes()) + "\n";
msg += prefix + " " + buf + " refCnt=" + buf.refCnt() + "\n";
// msg += prefix + " " + buf.getClass().getName() + "\n";
if (buf instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) buf;
msg += composite.allComponentTypes(prefix);
} else {
// msg += CompositeByteBuf.getBufTypes(prefix, buf);
}
msg += Strings.repeat("$", len) + "\n";
logger.info(msg);
}

private static String getIndexes(ByteBuf buf) {
return "rix=0x" + Integer.toHexString(buf.readerIndex())
+ " wix=0x" + Integer.toHexString(buf.writerIndex());
}
}
11 changes: 10 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ private NettyClientHandler(
this.authority = authority;
this.attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
setCumulator(ADAPTIVE_CUMULATOR);
// setCumulator(COMPOSITE_CUMULATOR);
// setCumulator(MERGE_CUMULATOR);

// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
Expand Down Expand Up @@ -497,6 +500,12 @@ InternalChannelz.Security getSecurityInfo() {
protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
Http2Exception http2Ex) {
logger.log(Level.FINE, "Caught a connection error", cause);
if (http2Ex != null) {
logger.log(Level.WARNING, "#### http2 ex: " + http2Ex + " err:" + http2Ex.error() + " shut:"
+ http2Ex.shutdownHint(), cause);
} else {
logger.log(Level.WARNING, "#### http2 ex: null", cause);
}
lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
// Parent class will shut down the Channel
super.onConnectionError(ctx, outbound, cause, http2Ex);
Expand Down Expand Up @@ -875,7 +884,7 @@ private Status statusFromH2Error(
if (statusCode == null) {
statusCode = status.getCode();
}
String debugString = "";
String debugString = "statusFromH2Error() debugString: ";
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
debugString = ", debug data: " + new String(debugData, UTF_8);
Expand Down
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ private NettyServerHandler(
Attributes eagAttributes) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
// setCumulator(ADAPTIVE_CUMULATOR);

final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
Expand Down
14 changes: 13 additions & 1 deletion netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static io.netty.util.CharsetUtil.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.grpc.InternalChannelz;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
Expand Down Expand Up @@ -286,7 +288,17 @@ public static Status statusFromThrowable(Throwable t) {
return Status.UNAVAILABLE.withDescription("unresolved address").withCause(t);
}
if (t instanceof Http2Exception) {
return Status.INTERNAL.withDescription("http2 exception").withCause(t);
return Status.INTERNAL
.withDescription(
"http2 exception: "
+ t
+ ".\n\n***** Exception Trace: *****\n"
+ Throwables.getStackTraceAsString(t)
+ "***** END Exception Trace *****\n\n"
+ "***** Thread Trace: *****\n"
+ Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ "\n***** END Thread Trace: *****\n\n")
.withCause(t);
}
return s;
}
Expand Down

0 comments on commit 14aadb4

Please sign in to comment.