Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release ByteBuf when handle onData failed #13102

Merged
merged 8 commits into from Oct 10, 2023
Expand Up @@ -237,9 +237,7 @@ void handleH2TransportError(TriRpcStatus status) {

void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean isReturnTriException) {
final Map<String, String> reserved = filterReservedHeaders(trailers);
final Map<String, Object> attachments = headersToMap(trailers, () -> {
return reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
});
final Map<String, Object> attachments = headersToMap(trailers, () -> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()));
final TriRpcStatus detailStatus;
final TriRpcStatus statusFromTrailers = getStatusFromTrailers(reserved);
if (statusFromTrailers != null) {
Expand Down Expand Up @@ -449,23 +447,33 @@ public void onHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
try {
executor.execute(() -> doOnData(data, endStream));
} catch (Throwable t) {
// Tasks will be rejected when the thread pool is closed or full,
// ByteBuf needs to be released to avoid out of heap memory leakage.
// For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult}
ReferenceCountUtil.release(data);
LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "submit onData task failed", t);
}
}

private void doOnData(ByteBuf data, boolean endStream) {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
deframer.deframe(data);
});
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
}
deframer.deframe(data);
}

@Override
Expand Down
Expand Up @@ -17,9 +17,10 @@

package org.apache.dubbo.rpc.protocol.tri.stream;

import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.HeaderFilter;
Expand Down Expand Up @@ -69,10 +70,11 @@
import java.util.concurrent.Executor;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;

public class TripleServerStream extends AbstractStream implements ServerStream {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleServerStream.class);
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
public final ServerTransportObserver transportObserver = new ServerTransportObserver();
private final TripleWriteQueue writeQueue;
private final PathResolver pathResolver;
Expand Down Expand Up @@ -408,11 +410,10 @@ private void processHeader(Http2Headers headers, boolean endStream) {
}
}

Map<String, Object> requestMetadata = headersToMap(headers, () -> {
return Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
.map(CharSequence::toString)
.orElse(null);
});
Map<String, Object> requestMetadata = headersToMap(headers, () ->
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
.map(CharSequence::toString)
.orElse(null));
boolean hasStub = pathResolver.hasNativeStub(path);
if (hasStub) {
listener = new StubAbstractServerCall(invoker, TripleServerStream.this,
Expand All @@ -431,7 +432,15 @@ private void processHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> doOnData(data, endStream));
try {
executor.execute(() -> doOnData(data, endStream));
} catch (Throwable t) {
// Tasks will be rejected when the thread pool is closed or full,
// ByteBuf needs to be released to avoid out of heap memory leakage.
// For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult}
ReferenceCountUtil.release(data);
LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData task failed", t);
}
}

private void doOnData(ByteBuf data, boolean endStream) {
Expand All @@ -454,10 +463,8 @@ public void cancelByRemote(long errorCode) {
if (listener == null) {
return;
}
executor.execute(() -> {
listener.onCancelByRemote(TriRpcStatus.CANCELLED
.withDescription("Canceled by client ,errorCode=" + errorCode));
});
executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED
.withDescription("Canceled by client ,errorCode=" + errorCode)));
}
}

Expand Down