Skip to content

Commit

Permalink
fix #9582, potential of fork cluster caused by timeout(#9613)
Browse files Browse the repository at this point in the history
  • Loading branch information
codeimport committed Mar 15, 2022
1 parent 7c177fa commit 7cf49db
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
Expand Up @@ -34,9 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_FORKS;

/**
Expand Down Expand Up @@ -66,7 +64,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
Expand Down Expand Up @@ -96,7 +93,7 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
Object ret = ref.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -104,6 +105,38 @@ private void resetInvokerToNoException() {
given(invoker3.getInterface()).willReturn(ForkingClusterInvokerTest.class);
}

private void resetInvokerToTimeout() {
given(invoker1.invoke(invocation)).willThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION));
given(invoker1.getUrl()).willReturn(url);
given(invoker1.isAvailable()).willReturn(true);
given(invoker1.getInterface()).willReturn(ForkingClusterInvokerTest.class);

given(invoker2.invoke(invocation)).willThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION));
given(invoker2.getUrl()).willReturn(url);
given(invoker2.isAvailable()).willReturn(true);
given(invoker2.getInterface()).willReturn(ForkingClusterInvokerTest.class);

given(invoker3.invoke(invocation)).willThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION));
given(invoker3.getUrl()).willReturn(url);
given(invoker3.isAvailable()).willReturn(true);
given(invoker3.getInterface()).willReturn(ForkingClusterInvokerTest.class);
}


@Test
public void testInvokeTimeout() {
resetInvokerToTimeout();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<>(dic);

try {
invoker.invoke(invocation);
Assertions.fail();
} catch (RpcException expected) {
assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
assertTrue(expected.getCause() instanceof RpcException);
}
}

@Test
public void testInvokeException() {
resetInvokerToException();
Expand All @@ -114,7 +147,7 @@ public void testInvokeException() {
invoker.invoke(invocation);
Assertions.fail();
} catch (RpcException expected) {
Assertions.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
assertFalse(expected.getCause() instanceof RpcException);
}
}
Expand All @@ -131,16 +164,16 @@ public void testClearRpcContext() {
RpcContext.getContext().setAttachment(attachKey, attachValue);

Map<String, Object> attachments = RpcContext.getContext().getObjectAttachments();
Assertions.assertTrue(attachments != null && attachments.size() == 1, "set attachment failed!");
assertTrue(attachments != null && attachments.size() == 1, "set attachment failed!");
try {
invoker.invoke(invocation);
Assertions.fail();
} catch (RpcException expected) {
Assertions.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"), "Succeeded to forking invoke provider !");
assertTrue(expected.getMessage().contains("Failed to forking invoke provider"), "Succeeded to forking invoke provider !");
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, Object> afterInvoke = RpcContext.getContext().getObjectAttachments();
Assertions.assertTrue(afterInvoke != null && afterInvoke.size() == 0, "clear attachment failed!");
assertTrue(afterInvoke != null && afterInvoke.size() == 0, "clear attachment failed!");
}

@Test()
Expand Down

0 comments on commit 7cf49db

Please sign in to comment.