Skip to content

Commit

Permalink
Merge pull request #2833 from cszxyang/polish_20220419
Browse files Browse the repository at this point in the history
异常处理、日志处理及部分代码简单美化
  • Loading branch information
xuxueli committed May 21, 2022
2 parents fbc02e5 + d55068a commit a466a6c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 40 deletions.
Expand Up @@ -83,6 +83,7 @@ public void start() throws Exception {
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}

public void destroy(){
// destroy executor-server
stopEmbedServer();
Expand Down Expand Up @@ -131,6 +132,7 @@ private void initAdminBizList(String adminAddresses, String accessToken) throws
}
}
}

public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
Expand Down Expand Up @@ -251,6 +253,7 @@ public static JobThread registJobThread(int jobId, IJobHandler handler, String r

return newJobThread;
}

public static JobThread removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
Expand All @@ -261,9 +264,8 @@ public static JobThread removeJobThread(int jobId, String removeOldReason){
}
return null;
}

public static JobThread loadJobThread(int jobId){
JobThread jobThread = jobThreadRepository.get(jobId);
return jobThread;
return jobThreadRepository.get(jobId);
}

}
64 changes: 27 additions & 37 deletions xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
Expand Up @@ -36,10 +36,8 @@ public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {

@Override
public void run() {

// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Expand All @@ -61,8 +59,6 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});


try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
Expand Down Expand Up @@ -92,11 +88,9 @@ public void initChannel(SocketChannel channel) throws Exception {
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
Expand All @@ -106,17 +100,15 @@ public void initChannel(SocketChannel channel) throws Exception {
logger.error(e.getMessage(), e);
}
}

}

});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}

public void stop() throws Exception {
// destroy server thread
if (thread!=null && thread.isAlive()) {
if (thread != null && thread.isAlive()) {
thread.interrupt();
}

Expand All @@ -130,7 +122,7 @@ public void stop() throws Exception {

/**
* netty_http
*
* <p>
* Copy from : https://github.com/xuxueli/xxl-rpc
*
* @author xuxueli 2015-11-24 22:25:15
Expand All @@ -141,6 +133,7 @@ public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<F
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;

public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
Expand All @@ -149,7 +142,6 @@ public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, Threa

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
Expand All @@ -175,38 +167,38 @@ public void run() {
}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}

// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down Expand Up @@ -261,6 +253,4 @@ public void stopRegistry() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}


}

0 comments on commit a466a6c

Please sign in to comment.