Skip to content

Commit

Permalink
Fix callback timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwen5 committed Dec 22, 2021
1 parent 55672e4 commit 216369f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.apache.dubbo.rpc.support.RpcUtils;

Expand All @@ -43,8 +44,13 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.remoting.Constants.DEFAULT_REMOTING_SERIALIZATION;
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
Expand Down Expand Up @@ -205,4 +211,19 @@ protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {

protected abstract Result doInvoke(Invocation invocation) throws Throwable;

protected int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
int timeout = DEFAULT_TIMEOUT;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
}
return timeout;
}
}
Expand Up @@ -71,7 +71,9 @@ protected Result doInvoke(Invocation invocation) throws Throwable {
currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv).thenApply(obj -> (AppResponse) obj);
final String methodName = RpcUtils.getMethodName(invocation);
final int timeout = calculateTimeout(invocation, methodName);
CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, null).thenApply(obj -> (AppResponse) obj);
return new AsyncRpcResult(appResponseFuture, inv);
}
} catch (RpcException e) {
Expand Down
Expand Up @@ -29,28 +29,21 @@
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;

Expand Down Expand Up @@ -182,20 +175,4 @@ private void destroyInternal(boolean closeAll) {
}
}
}

private int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
int timeout = DEFAULT_TIMEOUT;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
}
return timeout;
}
}

0 comments on commit 216369f

Please sign in to comment.