Skip to content

Commit

Permalink
feature:import TagRouter (#3065)
Browse files Browse the repository at this point in the history
  • Loading branch information
lexburner authored and cvictory committed Jan 17, 2019
1 parent ac49a7d commit 3375ec8
Show file tree
Hide file tree
Showing 18 changed files with 594 additions and 302 deletions.
Expand Up @@ -31,7 +31,7 @@
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
* @see com.alibaba.dubbo.rpc.cluster.Directory#list(Invocation)
*/
public interface Router extends Comparable<Router> {
public interface Router extends Comparable<Router>{

/**
* get the router url.
Expand All @@ -51,4 +51,11 @@ public interface Router extends Comparable<Router> {
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

/**
* Router's priority, used to sort routers.
*
* @return router's priority
*/
int getPriority();

}
Expand Up @@ -28,6 +28,7 @@
import com.alibaba.dubbo.rpc.cluster.Router;
import com.alibaba.dubbo.rpc.cluster.RouterFactory;
import com.alibaba.dubbo.rpc.cluster.router.MockInvokersSelector;
import com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -107,6 +108,7 @@ protected void setRouters(List<Router> routers) {
}
// append mock invoker selector
routers.add(new MockInvokersSelector());
routers.add(new TagRouter());
Collections.sort(routers);
this.routers = routers;
}
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.cluster.router;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.cluster.Router;

public abstract class AbstractRouter implements Router {

protected URL url;
protected int priority;

@Override
public URL getUrl() {
return url;
}

@Override
public int compareTo(Router o) {
return (this.getPriority() < o.getPriority()) ? -1 : ((this.getPriority() == o.getPriority()) ? 0 : 1);
}

public int getPriority() {
return priority;
}
}
Expand Up @@ -21,17 +21,21 @@
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Router;

import java.util.ArrayList;
import java.util.List;

/**
* A specific Router designed to realize mock feature.
* If a request is configured to use mock, then this router guarantees that only the invokers with protocol MOCK appear in final the invoker list, all other invokers will be excluded.
*
*/
public class MockInvokersSelector implements Router {
public class MockInvokersSelector extends AbstractRouter {

private static final int DEFAULT_PRIORITY = Integer.MAX_VALUE;

public MockInvokersSelector() {
this.priority = DEFAULT_PRIORITY;
}

@Override
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
Expand Down Expand Up @@ -87,14 +91,4 @@ private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
return hasMockProvider;
}

@Override
public URL getUrl() {
return null;
}

@Override
public int compareTo(Router o) {
return 1;
}

}
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Router;
import com.alibaba.dubbo.rpc.cluster.router.AbstractRouter;

import java.text.ParseException;
import java.util.ArrayList;
Expand All @@ -40,21 +41,19 @@

/**
* ConditionRouter
*
*/
public class ConditionRouter implements Router, Comparable<Router> {
public class ConditionRouter extends AbstractRouter {

private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class);
private static final int DEFAULT_PRIORITY = 2;
private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");
private final URL url;
private final int priority;
private final boolean force;
private final Map<String, MatchPair> whenCondition;
private final Map<String, MatchPair> thenCondition;

public ConditionRouter(URL url) {
this.url = url;
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
this.priority = url.getParameter(Constants.PRIORITY_KEY, DEFAULT_PRIORITY);
this.force = url.getParameter(Constants.FORCE_KEY, false);
try {
String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
Expand Down
Expand Up @@ -24,7 +24,7 @@
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Router;
import com.alibaba.dubbo.rpc.cluster.router.AbstractRouter;

import javax.script.Bindings;
import javax.script.Compilable;
Expand All @@ -40,26 +40,23 @@

/**
* ScriptRouter
*
*/
public class ScriptRouter implements Router {
public class ScriptRouter extends AbstractRouter {

private static final Logger logger = LoggerFactory.getLogger(ScriptRouter.class);

private static final int DEFAULT_PRIORITY = 1;

private static final Map<String, ScriptEngine> engines = new ConcurrentHashMap<String, ScriptEngine>();

private final ScriptEngine engine;

private final int priority;

private final String rule;

private final URL url;

public ScriptRouter(URL url) {
this.url = url;
String type = url.getParameter(Constants.TYPE_KEY);
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
this.priority = url.getParameter(Constants.PRIORITY_KEY, DEFAULT_PRIORITY);
String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
if (type == null || type.length() == 0) {
type = Constants.DEFAULT_SCRIPT_TYPE_KEY;
Expand All @@ -79,11 +76,6 @@ public ScriptRouter(URL url) {
this.rule = rule;
}

@Override
public URL getUrl() {
return url;
}

@Override
@SuppressWarnings("unchecked")
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
Expand Down Expand Up @@ -114,13 +106,4 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation
}
}

@Override
public int compareTo(Router o) {
if (o == null || o.getClass() != ScriptRouter.class) {
return 1;
}
ScriptRouter c = (ScriptRouter) o;
return this.priority == c.priority ? rule.compareTo(c.rule) : (this.priority > c.priority ? 1 : -1);
}

}
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.cluster.router.tag;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.router.AbstractRouter;

import java.util.ArrayList;
import java.util.List;

/**
* TagRouter
*/
public class TagRouter extends AbstractRouter {

private static final int DEFAULT_PRIORITY = 100;
private static final URL ROUTER_URL = new URL("tag", Constants.ANYHOST_VALUE, 0, Constants.ANY_VALUE).addParameters(Constants.RUNTIME_KEY, "true");

public TagRouter() {
this.url = ROUTER_URL;
this.priority = url.getParameter(Constants.PRIORITY_KEY, DEFAULT_PRIORITY);
}

@Override
public URL getUrl() {
return url;
}

@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// filter
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
// Dynamic param
String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
// Tag request
if (!StringUtils.isEmpty(tag)) {
// Select tag invokers first
for (Invoker<T> invoker : invokers) {
if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
// If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
if (result.isEmpty()) {
// Only forceTag = true force match, otherwise downgrade
String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);
if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
for (Invoker<T> invoker : invokers) {
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
}
return result;
}

}
Expand Up @@ -123,7 +123,7 @@ private AbstractLoadBalance getLoadBalance(String loadbalanceName) {

@Test
public void testRoundRobinLoadBalance_select() {
int runs = 10000;
int runs = 1000;
Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, RoundRobinLoadBalance.NAME);
for (Invoker minvoker : counter.keySet()) {
Long count = counter.get(minvoker).get();
Expand All @@ -146,7 +146,7 @@ private void assertStrictWRRResult(int runs, Map<Invoker, InvokeResult> resultMa
public void testRoundRobinLoadBalanceWithWeight() {
final Map<Invoker, InvokeResult> totalMap = new HashMap<Invoker, InvokeResult>();
final AtomicBoolean shouldBegin = new AtomicBoolean(false);
final int runs = 10000;
final int runs = 1000;
List<Thread> threads = new ArrayList<Thread>();
int threadNum = 10;
for (int i = 0; i < threadNum; i ++) {
Expand Down Expand Up @@ -188,7 +188,7 @@ public void run() {

@Test
public void testRoundRobinLoadBalanceWithWeightShouldNotRecycle() {
int runs = 10000;
int runs = 1000;
//tmperately add a new invoker
weightInvokers.add(weightInvokerTmp);
try {
Expand Down Expand Up @@ -226,7 +226,7 @@ public void testRoundRobinLoadBalanceWithWeightShouldRecycle() {
Assert.assertTrue("getField failed", true);
}
}
int runs = 10000;
int runs = 1000;
//temporarily add a new invoker
weightInvokers.add(weightInvokerTmp);
try {
Expand All @@ -249,7 +249,7 @@ public void testRoundRobinLoadBalanceWithWeightShouldRecycle() {
public void testSelectByWeightLeastActive() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int loop = 10000;
int loop = 1000;
LeastActiveLoadBalance lb = new LeastActiveLoadBalance();
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);
Expand All @@ -273,7 +273,7 @@ public void testSelectByWeightRandom() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int sumInvoker3 = 0;
int loop = 10000;
int loop = 1000;
RandomLoadBalance lb = new RandomLoadBalance();
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);
Expand Down Expand Up @@ -323,7 +323,7 @@ public void testRandomLoadBalance_select() {

@Test
public void testLeastActiveLoadBalance_select() {
int runs = 10000;
int runs = 1000;
Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, LeastActiveLoadBalance.NAME);
for (Invoker minvoker : counter.keySet()) {
Long count = counter.get(minvoker).get();
Expand Down

0 comments on commit 3375ec8

Please sign in to comment.