Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw InterruptedException in FailoverClusterInvoker #10188

Closed
happytimor opened this issue Jun 21, 2022 · 12 comments
Closed

throw InterruptedException in FailoverClusterInvoker #10188

happytimor opened this issue Jun 21, 2022 · 12 comments
Labels
type/discussion Everything related with code discussion or question

Comments

@happytimor
Copy link
Contributor

happytimor commented Jun 21, 2022

最近开发一个类似流程任务的一个功能, 核心就是在线程池里运行任务, 代码如下:

RUNNER_EXECUTOR.execute(() -> {
    pipelineService.startTask(taskId);
});

后来业务需要进行支持中断操作, 于是调整代码如下:

//启动逻辑
Future<?> future = this.RUNNER_EXECUTOR.submit(() -> {
    this.pipelineService.startTask(taskId);
});
TASK_FUTURE_MAP.put(taskId, future);
//中断逻辑
TASK_FUTURE_MAP.remove(taskId).cancel(true);

运行后发现任务无法被中断,查了下,发现最终是因为startTask方法里面的dubbo调用有设置重试次数(大部分时间是需要重试),failover策略会把InterruptException当做普通异常,直接重试了(打断失败)。

我觉得应该把InterruptException直接抛出来, 于是做了一个pr如下: #10164

//Brief code for FailoverClusterInvoker.java
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
    for (int i = 0; i < len; i++) {
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            
            //this is my change,  merge code start
            if (e.getCause().toString().equals(InterruptedException.class.getName())) {
                throw new RuntimeException(e);
            }
            //merge code end
            
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException();

想问下:

  1. 这个pr有没有合并价值
  2. 如果不这么做的话,怎么实现我的中断需求比较合理
@happytimor happytimor added the type/discussion Everything related with code discussion or question label Jun 21, 2022
@24kpure
Copy link
Contributor

24kpure commented Jun 24, 2022

Just in my opinitions,for consumers, exceptions thrown by providers during business operations can be converted to business exceptions.

@AlbumenJ
Copy link
Member

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

@happytimor
Copy link
Contributor Author

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

写了一个简单的代码模拟下:

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        Future<?> submit = threadPoolExecutor.submit(() -> {
            System.out.println("before: " + Thread.currentThread().isInterrupted());
            dubboInvoke();
            System.out.println("end: " + Thread.currentThread().isInterrupted());
        });
        Thread.sleep(1000);
        submit.cancel(true);
    }


    static void dubboInvoke() {
        try {
            interruptError();
        } catch (Exception e) {
            //retry invoke
        }
    }

    static void interruptError() throws Exception {
        try {
            Thread.sleep(5000);
            System.out.println("wake up");
        } catch (Exception e) {
            //contains interrupt exception(RpcException)
            throw e;
        }
    }

控制台输出信息:

before: false
end: false

invoke产生的InterruptException没有被抛出, 外层通过 Thread.currentThread().isInterrupted() 方法是感知不到的

@AlbumenJ
Copy link
Member

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

写了一个简单的代码模拟下:

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        Future<?> submit = threadPoolExecutor.submit(() -> {
            System.out.println("before: " + Thread.currentThread().isInterrupted());
            dubboInvoke();
            System.out.println("end: " + Thread.currentThread().isInterrupted());
        });
        Thread.sleep(1000);
        submit.cancel(true);
    }


    static void dubboInvoke() {
        try {
            interruptError();
        } catch (Exception e) {
            //retry invoke
        }
    }

    static void interruptError() throws Exception {
        try {
            Thread.sleep(5000);
            System.out.println("wake up");
        } catch (Exception e) {
            //contains interrupt exception(RpcException)
            throw e;
        }
    }

控制台输出信息:

before: false
end: false

invoke产生的InterruptException没有被抛出, 外层通过 Thread.currentThread().isInterrupted() 方法是感知不到的

这里应该在 catch 异常的时候将线程的 isInterrupted 状态置上去,如果目前 Dubbo 没有做的话也需要完善下

@happytimor
Copy link
Contributor Author

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

写了一个简单的代码模拟下:

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        Future<?> submit = threadPoolExecutor.submit(() -> {
            System.out.println("before: " + Thread.currentThread().isInterrupted());
            dubboInvoke();
            System.out.println("end: " + Thread.currentThread().isInterrupted());
        });
        Thread.sleep(1000);
        submit.cancel(true);
    }


    static void dubboInvoke() {
        try {
            interruptError();
        } catch (Exception e) {
            //retry invoke
        }
    }

    static void interruptError() throws Exception {
        try {
            Thread.sleep(5000);
            System.out.println("wake up");
        } catch (Exception e) {
            //contains interrupt exception(RpcException)
            throw e;
        }
    }

控制台输出信息:

before: false
end: false

invoke产生的InterruptException没有被抛出, 外层通过 Thread.currentThread().isInterrupted() 方法是感知不到的

这里应该在 catch 异常的时候将线程的 isInterrupted 状态置上去,如果目前 Dubbo 没有做的话也需要完善下

看了下源代码是没有做个操作的,所以写了这个pr #10164 , 我觉得核心是用户主动打断产生的异常, 不应该被重试(即使设置了重试次数)

@AlbumenJ
Copy link
Member

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

写了一个简单的代码模拟下:

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        Future<?> submit = threadPoolExecutor.submit(() -> {
            System.out.println("before: " + Thread.currentThread().isInterrupted());
            dubboInvoke();
            System.out.println("end: " + Thread.currentThread().isInterrupted());
        });
        Thread.sleep(1000);
        submit.cancel(true);
    }


    static void dubboInvoke() {
        try {
            interruptError();
        } catch (Exception e) {
            //retry invoke
        }
    }

    static void interruptError() throws Exception {
        try {
            Thread.sleep(5000);
            System.out.println("wake up");
        } catch (Exception e) {
            //contains interrupt exception(RpcException)
            throw e;
        }
    }

控制台输出信息:

before: false
end: false

invoke产生的InterruptException没有被抛出, 外层通过 Thread.currentThread().isInterrupted() 方法是感知不到的

这里应该在 catch 异常的时候将线程的 isInterrupted 状态置上去,如果目前 Dubbo 没有做的话也需要完善下

看了下源代码是没有做个操作的,所以写了这个pr #10164 , 我觉得核心是用户主动打断产生的异常, 不应该被重试(即使设置了重试次数)

#10164 的问题是 InterruptedException.class 这个异常可能是服务端抛出来的异常(即是业务异常),不一定是客户端的异常。所以直接判断 cause 为 InterruptedException 是有问题的。

@happytimor
Copy link
Contributor Author

  1. 打断重试的这个需求应该是我们需要支持的,只是实现的方式需要考虑下
  2. 是否可以在 catch 异常的时候判断当前线程是否是已经被打断的,以此来确定是否需要把报错外发

写了一个简单的代码模拟下:

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

        Future<?> submit = threadPoolExecutor.submit(() -> {
            System.out.println("before: " + Thread.currentThread().isInterrupted());
            dubboInvoke();
            System.out.println("end: " + Thread.currentThread().isInterrupted());
        });
        Thread.sleep(1000);
        submit.cancel(true);
    }


    static void dubboInvoke() {
        try {
            interruptError();
        } catch (Exception e) {
            //retry invoke
        }
    }

    static void interruptError() throws Exception {
        try {
            Thread.sleep(5000);
            System.out.println("wake up");
        } catch (Exception e) {
            //contains interrupt exception(RpcException)
            throw e;
        }
    }

控制台输出信息:

before: false
end: false

invoke产生的InterruptException没有被抛出, 外层通过 Thread.currentThread().isInterrupted() 方法是感知不到的

这里应该在 catch 异常的时候将线程的 isInterrupted 状态置上去,如果目前 Dubbo 没有做的话也需要完善下

看了下源代码是没有做个操作的,所以写了这个pr #10164 , 我觉得核心是用户主动打断产生的异常, 不应该被重试(即使设置了重试次数)

#10164 的问题是 InterruptedException.class 这个异常可能是服务端抛出来的异常(即是业务异常),不一定是客户端的异常。所以直接判断 cause 为 InterruptedException 是有问题的。

也有道理, 那就只能想办法来判断错误发生在客户端还是服务端了

@AlbumenJ
Copy link
Member

AlbumenJ commented Jul 1, 2022

@happytimor 从 isInterrupted 状态 + InterruptedException.class 理论上是可以判断的,你可以试试

@AlbumenJ AlbumenJ reopened this Jul 1, 2022
@happytimor
Copy link
Contributor Author

@happytimor 从 isInterrupted 状态 + InterruptedException.class 理论上是可以判断的,你可以试试

我试下

@happytimor
Copy link
Contributor Author

@AlbumenJ debug看了下,服务端的异常不会出现在RpcException里面, 对于dubbo来说, 这是一个正常的返回

请看截图
image

@AlbumenJ
Copy link
Member

AlbumenJ commented Jul 1, 2022

你可以试下服务端请求中,Filter 里面抛出 InterruptedException

@happytimor
Copy link
Contributor Author

#10164 have a look @AlbumenJ
只需要打断当前线程就好了, 重试的时候,会在 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); 方法里面被LinkedBlockingQueue.take()方法打断

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/discussion Everything related with code discussion or question
Projects
None yet
Development

No branches or pull requests

3 participants