Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt current thread when InterruptException occur #10164

Merged
merged 2 commits into from Jul 11, 2022
Merged

Interrupt current thread when InterruptException occur #10164

merged 2 commits into from Jul 11, 2022

Conversation

happytimor
Copy link
Contributor

@happytimor happytimor commented Jun 15, 2022

What is the purpose of the change

Interrupt current thread when InterruptException occur

org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker.java

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = invoker.invoke(invocation);
        try {
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            // interrupt thread
            Thread.currentThread().interrupt();
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } 
        return asyncResult;
    }

Brief changelog

Interrupt current thread when InterruptException occur

Verifying this change

Checklist

  • Make sure there is a GitHub_issue field for the change (usually before you start working on it). Trivial changes like typos do not require a GitHub issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Check if is necessary to patch to Dubbo 3 if you are work on Dubbo 2.7
  • Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add sample in dubbo samples project.
  • Add some description to dubbo-website project if you are requesting to add a feature.
  • GitHub Actions works fine on your own branch.
  • If this contribution is large, please follow the Software Donation Guide.

@codecov-commenter
Copy link

codecov-commenter commented Jun 20, 2022

Codecov Report

Merging #10164 (8d6a8c1) into master (9efe21a) will increase coverage by 0.33%.
The diff coverage is 0.00%.

@@             Coverage Diff              @@
##             master   #10164      +/-   ##
============================================
+ Coverage     60.52%   60.86%   +0.33%     
- Complexity      412      447      +35     
============================================
  Files          1100     1100              
  Lines         44569    44571       +2     
  Branches       6492     6493       +1     
============================================
+ Hits          26976    27126     +150     
+ Misses        14624    14472     -152     
- Partials       2969     2973       +4     
Impacted Files Coverage Δ
...bo/rpc/cluster/support/FailoverClusterInvoker.java 85.18% <0.00%> (-3.28%) ⬇️
...dubbo/remoting/exchange/support/DefaultFuture.java 88.28% <0.00%> (-5.41%) ⬇️
...port/identifier/BaseServiceMetadataIdentifier.java 53.57% <0.00%> (-3.58%) ⬇️
.../apache/dubbo/config/bootstrap/DubboBootstrap.java 54.47% <0.00%> (-0.16%) ⬇️
...ng/transport/dispatcher/all/AllChannelHandler.java 89.65% <0.00%> (ø)
...n/java/org/apache/dubbo/common/utils/NetUtils.java 68.81% <0.00%> (+0.32%) ⬆️
...rg/apache/dubbo/common/timer/HashedWheelTimer.java 79.72% <0.00%> (+0.34%) ⬆️
...apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java 63.85% <0.00%> (+0.35%) ⬆️
...org/apache/dubbo/registry/redis/RedisRegistry.java 58.42% <0.00%> (+0.37%) ⬆️
...n/java/org/apache/dubbo/config/AbstractConfig.java 74.35% <0.00%> (+0.64%) ⬆️
... and 23 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9efe21a...8d6a8c1. Read the comment docs.

Comment on lines 96 to 99
if (e.getCause() != null && InterruptedException.class.getName().equals(e.getCause().toString())) {
// don`t catch interrupt exception
throw new RuntimeException(e);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can choose other fault tolerance strategies, such as cluster="failfast"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a conflict, we need failover strategy at most time, but sometimes due to business needs, I need to interrupt thread pool tasks(which contains dubbo invoke)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupting a thread is not a common scenario, and I don't approve of modifying non-generic logic.
Based on your scenario, I think you may need a custom Cluster implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my code is like this

Future<?> future =RUNNER_EXECUTOR.submit(() -> {
    pipelineService.startTask(taskId);
});
//because some logic error, i need find the future and  cancel task
future.cancel();

i think it is a normal scenario......

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if it is not normal behavior to interrupt the thread, dubbo still needs to initiate a retry. If it is modified according to the above logic, the retry capability will be invalid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i want disable the retry capability when it is initiative inerrupted by login user
i have to copy failover class and use SPI to override the logic at production which seems ugly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that you can first create an issue to discuss this requirement, so that more people can see and participate in the discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe later, thanks for your patient reply.

@happytimor happytimor changed the title throw InterruptedException in FailoverClusterInvoker Interrupt current thread when InterruptException occur Jul 8, 2022
@AlbumenJ
Copy link
Member

AlbumenJ commented Jul 8, 2022

would you please add some test cases to verify this

@happytimor
Copy link
Contributor Author

happytimor commented Jul 9, 2022

would you please add some test cases to verify this

it is a bit difficult for me to setup the ut environment with mockito, i can only provide basic code

consumer:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
String name = "world" + System.currentTimeMillis();
int initialFlag = 0, runningFlag = 1, normalReturn = 2, interruptReturn = 3;
AtomicInteger flag = new AtomicInteger(initialFlag);
Future<?> future = threadPoolExecutor.submit(() -> {
    try {
        flag.set(runningFlag);
        service.sayHello(name);
        flag.set(normalReturn);
    } catch (Exception e) {
        flag.set(interruptReturn);
    }
});
Thread.sleep(100);
future.cancel(true);
while (flag.get() == initialFlag || flag.get() == runningFlag) {
    Thread.yield();
}
Assertions.assertEquals(flag.get(), interruptReturn);

provider:

String lastName = "";

@Override
public String sayHello(String name) {
    if (!lastName.equals(name)) {
        lastName = name;
        try {
            //will be interrupted at first time
            Thread.sleep(200);
        } catch (InterruptedException ignore) {

        }
    }
    return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}

important parameter:

retries=3&cluster=failover


flag will return normalReturn if we do not add Thread.currentThread().interrupt()

@AlbumenJ AlbumenJ merged commit 787d26d into apache:master Jul 11, 2022
@AlbumenJ
Copy link
Member

@happytimor Please apply this pr to 3.0 branch.

@happytimor happytimor deleted the feat_interrupt branch July 11, 2022 01:55
@happytimor happytimor restored the feat_interrupt branch July 11, 2022 01:55
@happytimor
Copy link
Contributor Author

#10299 @AlbumenJ

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/don’t-merge No plan to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants