Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DUBBO 2.7.11 版本Consumer 端响应丢失 #9299

Closed
fl061157 opened this issue Nov 19, 2021 · 5 comments
Closed

DUBBO 2.7.11 版本Consumer 端响应丢失 #9299

fl061157 opened this issue Nov 19, 2021 · 5 comments
Labels
type/bug Bugs to being fixed

Comments

@fl061157
Copy link

背景:
压测情形下,后端微服务异常重启。后端微服务恢复正常后,调用方响应丢失。
现象:
发现 调用方 (全异步环境)无法给出响应,Nginx 超时连接断开。
分析:
日志分析,Dubbo Consumer 无法给出响应,但 在宿主机上通过 Tcpdump 抓包,发现是有请求,以及请求响应。
使用 Jmap 打印内存堆,发现 DefaultFuture 持续堆积。 (异常后,每秒进入的请求已经是个位数了)。 分析代码
DefaultFuture 从 Map 中 Remove 有两种方式,一种是 处理 请求到达,一种是 HashWeelTimerTask 处理超时。
持续堆积,只能是这两个线程都没有处理。

使用 Jstack 打印线程栈,发现 。DubboClientHandler-thread- 对应的线程已经不在。(这个是DUBBO默认使用的 Consumer)消费Response线程。 执行 (ChannelEventRunnable) 任务。,这也能够解释 为什么 无法执行 DefaultFuture.receive。

再次阅读代码,在 ChannelEventRunnable 中所有异常 都是捕获的 ,那线程是如何Crash的?

注:
1:IO 线程 NettyClientWorker 一直都在,后端微服务正常后,应该是能够正确处理IO事件的。
2:DubboClientHandler-thread- 我们使用的默认配置。 CachedThreadPool。 没有设置 queue 。也就是默认使用 SynchronousQueue。
3:整个 Consumer 端全部是异步。

@fl061157 fl061157 added the type/bug Bugs to being fixed label Nov 19, 2021
@zrlw
Copy link
Contributor

zrlw commented Nov 23, 2021

可能和 #7815 是同样的问题,2.7.12版本已修复。

@fl061157
Copy link
Author

@zrlw 嗯, 后面我们替换了 。 String DISPATCHER_KEY = "dispatcher"; 扩展了一下,也是包住了异常,就没有在复现。

public class DubboRequestDispatcher implements Dispatcher {

@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    return new DubboRequestChannelHandler(handler , url);
}

}

public class DubboRequestChannelHandler extends WrappedChannelHandler {

private final static Logger LOGGER = LoggerFactory.getLogger(DubboRequestChannelHandler.class);

public final static Executor REQUEST_EXEC = //;

public DubboRequestChannelHandler(ChannelHandler handler, URL url) {
    super(handler, url);
}

@Override
public void connected(Channel channel) throws RemotingException {
    try {
        Runnable runnable = new ChannelEventRunnable(channel, handler, ChannelEventRunnable.ChannelState.CONNECTED);
        InnerRunnableWrapper rw = new InnerRunnableWrapper(runnable);
        REQUEST_EXEC.execute(rw);
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    try {
        Runnable runnable = new ChannelEventRunnable(channel, handler, ChannelEventRunnable.ChannelState.DISCONNECTED);
        InnerRunnableWrapper rw = new InnerRunnableWrapper(runnable);
        REQUEST_EXEC.execute(rw);
    } catch (Throwable t) {
        throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
    }
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
    try {
        Runnable runnable = new ChannelEventRunnable(channel, handler, ChannelEventRunnable.ChannelState.RECEIVED, message);
        InnerRunnableWrapper rw = new InnerRunnableWrapper(runnable);
        REQUEST_EXEC.execute(rw);
    } catch (Throwable t) {
        if (message instanceof Request && t instanceof RejectedExecutionException) {
            sendFeedback(channel, (Request) message, t);
            return;
        }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
    try {
        Runnable runnable = new ChannelEventRunnable(channel, handler, ChannelEventRunnable.ChannelState.CAUGHT, exception);
        InnerRunnableWrapper rw = new InnerRunnableWrapper(runnable);
        REQUEST_EXEC.execute(rw);
    } catch (Throwable t) {
        throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
    }
}

private static class InnerRunnableWrapper implements Runnable {

    private Runnable runnable;

    public InnerRunnableWrapper(Runnable runnable) {
        this.runnable = runnable;
    }

    @Override
    public void run() {
        try {
            runnable.run();
        } catch (Throwable throwable) {
            LOGGER.error("run channelHandler error.", throwable);
        }
    }
}

}

@zrlw
Copy link
Contributor

zrlw commented Nov 24, 2021

DubboRequestChannelHandler的各个Override方法也要捕获异常,否则DubboClientHandler线程还是会被异常终止吧。

@zrlw
Copy link
Contributor

zrlw commented Nov 24, 2021

想起来了,这个问题先后做过两次修订,master分支最后一次修订是 #8931 ,3.0分支是#8930,3.0.4已修复,但2.7系列还没有发release,可参考一下修订内容自己改一下。

@fl061157
Copy link
Author

@zrlw 感谢。所有异常已经包起来了。 问题应该已经解决了。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Bugs to being fixed
Projects
None yet
Development

No branches or pull requests

3 participants