diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java index 99f993566d8..da7ddc45439 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java @@ -237,9 +237,7 @@ void handleH2TransportError(TriRpcStatus status) { void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean isReturnTriException) { final Map reserved = filterReservedHeaders(trailers); - final Map attachments = headersToMap(trailers, () -> { - return reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()); - }); + final Map attachments = headersToMap(trailers, () -> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader())); final TriRpcStatus detailStatus; final TriRpcStatus statusFromTrailers = getStatusFromTrailers(reserved); if (statusFromTrailers != null) { @@ -449,23 +447,33 @@ public void onHeader(Http2Headers headers, boolean endStream) { @Override public void onData(ByteBuf data, boolean endStream) { - executor.execute(() -> { - if (transportError != null) { - transportError.appendDescription( - "Data:" + data.toString(StandardCharsets.UTF_8)); - ReferenceCountUtil.release(data); - if (transportError.description.length() > 512 || endStream) { - handleH2TransportError(transportError); - } - return; - } - if (!headerReceived) { - handleH2TransportError(TriRpcStatus.INTERNAL.withDescription( - "headers not received before payload")); - return; + try { + executor.execute(() -> doOnData(data, endStream)); + } catch (Throwable t) { + // Tasks will be rejected when the thread pool is closed or full, + // ByteBuf needs to be released to avoid out of heap memory leakage. + // For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult} + ReferenceCountUtil.release(data); + LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "submit onData task failed", t); + } + } + + private void doOnData(ByteBuf data, boolean endStream) { + if (transportError != null) { + transportError.appendDescription( + "Data:" + data.toString(StandardCharsets.UTF_8)); + ReferenceCountUtil.release(data); + if (transportError.description.length() > 512 || endStream) { + handleH2TransportError(transportError); } - deframer.deframe(data); - }); + return; + } + if (!headerReceived) { + handleH2TransportError(TriRpcStatus.INTERNAL.withDescription( + "headers not received before payload")); + return; + } + deframer.deframe(data); } @Override diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java index be9bfc4951f..88e5cd0b3d0 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java @@ -17,9 +17,10 @@ package org.apache.dubbo.rpc.protocol.tri.stream; +import io.netty.util.ReferenceCountUtil; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.HeaderFilter; @@ -69,10 +70,11 @@ import java.util.concurrent.Executor; import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST; public class TripleServerStream extends AbstractStream implements ServerStream { - private static final Logger LOGGER = LoggerFactory.getLogger(TripleServerStream.class); + private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class); public final ServerTransportObserver transportObserver = new ServerTransportObserver(); private final TripleWriteQueue writeQueue; private final PathResolver pathResolver; @@ -408,11 +410,10 @@ private void processHeader(Http2Headers headers, boolean endStream) { } } - Map requestMetadata = headersToMap(headers, () -> { - return Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader())) - .map(CharSequence::toString) - .orElse(null); - }); + Map requestMetadata = headersToMap(headers, () -> + Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader())) + .map(CharSequence::toString) + .orElse(null)); boolean hasStub = pathResolver.hasNativeStub(path); if (hasStub) { listener = new StubAbstractServerCall(invoker, TripleServerStream.this, @@ -431,7 +432,15 @@ private void processHeader(Http2Headers headers, boolean endStream) { @Override public void onData(ByteBuf data, boolean endStream) { - executor.execute(() -> doOnData(data, endStream)); + try { + executor.execute(() -> doOnData(data, endStream)); + } catch (Throwable t) { + // Tasks will be rejected when the thread pool is closed or full, + // ByteBuf needs to be released to avoid out of heap memory leakage. + // For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult} + ReferenceCountUtil.release(data); + LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData task failed", t); + } } private void doOnData(ByteBuf data, boolean endStream) { @@ -454,10 +463,8 @@ public void cancelByRemote(long errorCode) { if (listener == null) { return; } - executor.execute(() -> { - listener.onCancelByRemote(TriRpcStatus.CANCELLED - .withDescription("Canceled by client ,errorCode=" + errorCode)); - }); + executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED + .withDescription("Canceled by client ,errorCode=" + errorCode))); } }