diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java index d27fcbbae40..d10d383571a 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java @@ -1,116 +1,116 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.protocol; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.remoting.Constants; -import org.apache.dubbo.rpc.Exporter; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Protocol; -import org.apache.dubbo.rpc.ProtocolServer; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.support.ProtocolUtils; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; - -/** - * abstract ProtocolSupport. - */ -public abstract class AbstractProtocol implements Protocol { - - protected final Logger logger = LoggerFactory.getLogger(getClass()); - - protected final Map> exporterMap = new ConcurrentHashMap>(); - - /** - * - */ - protected final Map serverMap = new ConcurrentHashMap<>(); - - //TODO SoftReference - protected final Set> invokers = new ConcurrentHashSet>(); - - protected static String serviceKey(URL url) { - int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); - return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY)); - } - - protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { - return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); - } - - public List getServers() { - return Collections.unmodifiableList(new ArrayList<>(serverMap.values())); - } - - @Override - public void destroy() { - for (Invoker invoker : invokers) { - if (invoker != null) { - invokers.remove(invoker); - try { - if (logger.isInfoEnabled()) { - logger.info("Destroy reference: " + invoker.getUrl()); - } - invoker.destroy(); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - } - } - for (String key : new ArrayList(exporterMap.keySet())) { - Exporter exporter = exporterMap.remove(key); - if (exporter != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Unexport service: " + exporter.getInvoker().getUrl()); - } - exporter.unexport(); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - } - } - } - - @Override - public Invoker refer(Class type, URL url) throws RpcException { - return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); - } - - protected abstract Invoker protocolBindingRefer(Class type, URL url) throws RpcException; - - public Map> getExporterMap() { - return exporterMap; - } - - public Collection> getExporters() { - return Collections.unmodifiableCollection(exporterMap.values()); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.remoting.Constants; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Protocol; +import org.apache.dubbo.rpc.ProtocolServer; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.support.ProtocolUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; + +/** + * abstract ProtocolSupport. + */ +public abstract class AbstractProtocol implements Protocol { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected final DelegateExporterMap exporterMap = new DelegateExporterMap(); + + /** + * + */ + protected final Map serverMap = new ConcurrentHashMap<>(); + + //TODO SoftReference + protected final Set> invokers = new ConcurrentHashSet>(); + + protected static String serviceKey(URL url) { + int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); + return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY)); + } + + protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { + return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); + } + + @Override + public List getServers() { + return Collections.unmodifiableList(new ArrayList<>(serverMap.values())); + } + + @Override + public void destroy() { + for (Invoker invoker : invokers) { + if (invoker != null) { + invokers.remove(invoker); + try { + if (logger.isInfoEnabled()) { + logger.info("Destroy reference: " + invoker.getUrl()); + } + invoker.destroy(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + } + for (Map.Entry> item : exporterMap.getExporterMap().entrySet()) { + if (exporterMap.removeExportMap(item.getKey(), item.getValue())) { + try { + if (logger.isInfoEnabled()) { + logger.info("Unexport service: " + item.getValue().getInvoker().getUrl()); + } + item.getValue().unexport(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + } + } + + @Override + public Invoker refer(Class type, URL url) throws RpcException { + return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); + } + + protected abstract Invoker protocolBindingRefer(Class type, URL url) throws RpcException; + + public Map> getExporterMap() { + return exporterMap.getExporterMap(); + } + + public Collection> getExporters() { + return exporterMap.getExporters(); + } +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java index c9965a2bbca..c2b6f3dbde1 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java @@ -77,7 +77,7 @@ public void setProxyFactory(ProxyFactory proxyFactory) { @SuppressWarnings("unchecked") public Exporter export(final Invoker invoker) throws RpcException { final String uri = serviceKey(invoker.getUrl()); - Exporter exporter = (Exporter) exporterMap.get(uri); + Exporter exporter = (Exporter) exporterMap.getExport(uri); if (exporter != null) { // When modifying the configuration through override, you need to re-expose the newly modified service. if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) { @@ -88,7 +88,7 @@ public Exporter export(final Invoker invoker) throws RpcException { exporter = new AbstractExporter(invoker) { @Override public void afterUnExport() { - exporterMap.remove(uri); + exporterMap.removeExportMap(uri, this); if (runnable != null) { try { runnable.run(); @@ -98,7 +98,7 @@ public void afterUnExport() { } } }; - exporterMap.put(uri, exporter); + exporterMap.addExportMap(uri, exporter); return exporter; } @@ -277,5 +277,4 @@ public boolean isClosed() { } } - } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/DelegateExporterMap.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/DelegateExporterMap.java new file mode 100644 index 00000000000..e596f7f56d7 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/DelegateExporterMap.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol; + +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.rpc.Exporter; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * delegate exportermap oper + */ +public class DelegateExporterMap { + protected final Map> exporterMap = new ConcurrentHashMap>(); + + /** + * check is empty map + * @return + */ + public boolean isEmpty() { + return CollectionUtils.isEmptyMap(exporterMap); + } + + /** + * get export + * @param key + * @return + */ + public Exporter getExport(String key) { + return exporterMap.get(key); + } + + /** + * add + * @param key + * @param exporter + */ + public void addExportMap(String key, Exporter exporter) { + exporterMap.put(key, exporter); + } + + /** + * delete + * @param key + */ + public boolean removeExportMap(String key, Exporter exporter) { + Exporter findExporter = exporterMap.get(key); + if(findExporter == exporter){ + exporterMap.remove(key); + return true; + } + return false; + } + + /** + * get the exports + * @return + */ + public Map> getExporterMap() { + return exporterMap; + } + + /** + * get all exports + * @return + */ + public Collection> getExporters() { + return Collections.unmodifiableCollection(exporterMap.values()); + } +} diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboExporter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboExporter.java index 57c1954f438..3d64dd2b952 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboExporter.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboExporter.java @@ -16,11 +16,9 @@ */ package org.apache.dubbo.rpc.protocol.dubbo; -import org.apache.dubbo.rpc.Exporter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.protocol.AbstractExporter; - -import java.util.Map; +import org.apache.dubbo.rpc.protocol.DelegateExporterMap; /** * DubboExporter @@ -29,17 +27,16 @@ public class DubboExporter extends AbstractExporter { private final String key; - private final Map> exporterMap; + private final DelegateExporterMap delegateExporterMap; - public DubboExporter(Invoker invoker, String key, Map> exporterMap) { + public DubboExporter(Invoker invoker, String key, DelegateExporterMap delegateExporterMap) { super(invoker); this.key = key; - this.exporterMap = exporterMap; + this.delegateExporterMap = delegateExporterMap; } @Override public void afterUnExport() { - exporterMap.remove(key); + delegateExporterMap.removeExportMap(key, this); } - } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 9b6067e349a..220f8bfd5e5 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -225,11 +225,6 @@ public static DubboProtocol getDubboProtocol() { return INSTANCE; } - @Override - public Collection> getExporters() { - return Collections.unmodifiableCollection(exporterMap.values()); - } - private boolean isClientSide(Channel channel) { InetSocketAddress address = channel.getRemoteAddress(); URL url = channel.getUrl(); @@ -263,11 +258,11 @@ Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); - DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); + DubboExporter exporter = (DubboExporter) exporterMap.getExport(serviceKey); if (exporter == null) { throw new RemotingException(channel, - "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + + "Not found exported service: " + serviceKey + " in " + exporterMap.getExporterMap().keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } @@ -291,7 +286,7 @@ public Exporter export(Invoker invoker) throws RpcException { // export service. String key = serviceKey(url); DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); - exporterMap.put(key, exporter); + exporterMap.addExportMap(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmExporter.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmExporter.java index cd899b49717..307744cff95 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmExporter.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmExporter.java @@ -16,11 +16,9 @@ */ package org.apache.dubbo.rpc.protocol.injvm; -import org.apache.dubbo.rpc.Exporter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.protocol.AbstractExporter; - -import java.util.Map; +import org.apache.dubbo.rpc.protocol.DelegateExporterMap; /** * InjvmExporter @@ -29,18 +27,17 @@ class InjvmExporter extends AbstractExporter { private final String key; - private final Map> exporterMap; + private final DelegateExporterMap delegateExporterMap; - InjvmExporter(Invoker invoker, String key, Map> exporterMap) { + InjvmExporter(Invoker invoker, String key, DelegateExporterMap delegateExporterMap) { super(invoker); this.key = key; - this.exporterMap = exporterMap; - exporterMap.put(key, this); + this.delegateExporterMap = delegateExporterMap; } @Override public void afterUnExport() { - exporterMap.remove(key); + delegateExporterMap.removeExportMap(key, this); } - } + diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java index 8e16d2ccef3..cd3c31c17a6 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java @@ -31,8 +31,8 @@ import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.protocol.AbstractInvoker; +import org.apache.dubbo.rpc.protocol.DelegateExporterMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -46,19 +46,19 @@ class InjvmInvoker extends AbstractInvoker { private final String key; - private final Map> exporterMap; + private final DelegateExporterMap delegateExporterMap; private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); - InjvmInvoker(Class type, URL url, String key, Map> exporterMap) { + InjvmInvoker(Class type, URL url, String key, DelegateExporterMap delegateExporterMap) { super(type, url); this.key = key; - this.exporterMap = exporterMap; + this.delegateExporterMap = delegateExporterMap; } @Override public boolean isAvailable() { - InjvmExporter exporter = (InjvmExporter) exporterMap.get(key); + InjvmExporter exporter = (InjvmExporter) delegateExporterMap.getExport(key); if (exporter == null) { return false; } else { @@ -68,7 +68,7 @@ public boolean isAvailable() { @Override public Result doInvoke(Invocation invocation) throws Throwable { - Exporter exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); + Exporter exporter = InjvmProtocol.getExporter(delegateExporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java index 10df8ee380b..d9cef56074d 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java @@ -18,17 +18,15 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.rpc.Exporter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.protocol.AbstractProtocol; +import org.apache.dubbo.rpc.protocol.DelegateExporterMap; import org.apache.dubbo.rpc.support.ProtocolUtils; -import java.util.Map; - import static org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER; import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; import static org.apache.dubbo.rpc.Constants.SCOPE_KEY; @@ -58,14 +56,14 @@ public static InjvmProtocol getInjvmProtocol() { return INSTANCE; } - static Exporter getExporter(Map> map, URL key) { + static Exporter getExporter(DelegateExporterMap delegateExporterMap, URL key) { Exporter result = null; if (!key.getServiceKey().contains("*")) { - result = map.get(key.getServiceKey()); + result = delegateExporterMap.getExport(key.getServiceKey()); } else { - if (CollectionUtils.isNotEmptyMap(map)) { - for (Exporter exporter : map.values()) { + if (!delegateExporterMap.isEmpty()) { + for (Exporter exporter : delegateExporterMap.getExporters()) { if (UrlUtils.isServiceKeyMatch(key, exporter.getInvoker().getUrl())) { result = exporter; break; @@ -91,7 +89,10 @@ public int getDefaultPort() { @Override public Exporter export(Invoker invoker) throws RpcException { - return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap); + String serviceKey = invoker.getUrl().getServiceKey(); + InjvmExporter tInjvmExporter = new InjvmExporter<>(invoker, serviceKey, exporterMap); + exporterMap.addExportMap(serviceKey, tInjvmExporter); + return tInjvmExporter; } @Override diff --git a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java index 0370c6b72e9..3cc461f14ba 100644 --- a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java @@ -24,10 +24,12 @@ import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.ProxyFactory; +import org.apache.dubbo.rpc.protocol.DelegateExporterMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -74,7 +76,32 @@ public void testLocalProtocol() throws Exception { assertEquals(service.getSize(new String[]{"", "", ""}), 3); service.invoke("injvm://127.0.0.1/TestService", "invoke"); - InjvmInvoker injvmInvoker = new InjvmInvoker(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap>()); + InjvmInvoker injvmInvoker = new InjvmInvoker(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new DelegateExporterMap() { + @Override + public boolean isEmpty() { + return true; + } + + @Override + public Exporter getExport(String key) { + return null; + } + + @Override + public void addExportMap(String key, Exporter exporter) { + + } + + @Override + public boolean removeExportMap(String key, Exporter exporter) { + return true; + } + + @Override + public Collection> getExporters() { + return null; + } + }); assertFalse(injvmInvoker.isAvailable()); } diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java index 2758c5eaac3..eb68b58aed8 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java @@ -1,272 +1,272 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.protocol.thrift; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.ConfigurationUtils; -import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.remoting.Channel; -import org.apache.dubbo.remoting.RemotingException; -import org.apache.dubbo.remoting.RemotingServer; -import org.apache.dubbo.remoting.Transporter; -import org.apache.dubbo.remoting.exchange.ExchangeChannel; -import org.apache.dubbo.remoting.exchange.ExchangeClient; -import org.apache.dubbo.remoting.exchange.ExchangeHandler; -import org.apache.dubbo.remoting.exchange.ExchangeServer; -import org.apache.dubbo.remoting.exchange.Exchangers; -import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.dubbo.rpc.Exporter; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.ProtocolServer; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.protocol.AbstractProtocol; -import org.apache.dubbo.rpc.protocol.dubbo.DubboExporter; - -import java.util.ArrayList; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; -import static org.apache.dubbo.remoting.Constants.CHANNEL_READONLYEVENT_SENT_KEY; -import static org.apache.dubbo.remoting.Constants.CLIENT_KEY; -import static org.apache.dubbo.remoting.Constants.CODEC_KEY; -import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY; -import static org.apache.dubbo.remoting.Constants.SERVER_KEY; -import static org.apache.dubbo.rpc.Constants.IS_SERVER_KEY; - -/** - * @since 2.7.0, use https://github.com/dubbo/dubbo-rpc-native-thrift instead - */ -@Deprecated -public class ThriftProtocol extends AbstractProtocol { - - public static final int DEFAULT_PORT = 40880; - - public static final String NAME = "thrift"; - - private ExchangeHandler handler = new ExchangeHandlerAdapter() { - - @Override - public CompletableFuture reply(ExchangeChannel channel, Object msg) throws RemotingException { - - if (msg instanceof Invocation) { - Invocation inv = (Invocation) msg; - String path = (String) inv.getObjectAttachments().get(PATH_KEY); - String serviceKey = serviceKey(channel.getLocalAddress().getPort(), - path, null, null); - DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); - if (exporter == null) { - throw new RemotingException(channel, - "Not found exported service: " - + serviceKey - + " in " - + exporterMap.keySet() - + ", may be version or group mismatch " - + ", channel: consumer: " - + channel.getRemoteAddress() - + " --> provider: " - + channel.getLocalAddress() - + ", message:" + msg); - } - - RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); - - Result result = exporter.getInvoker().invoke(inv); - return result.thenApply(Function.identity()); - } - - throw new RemotingException(channel, - "Unsupported request: " - + (msg.getClass().getName() + ": " + msg) - + ", channel: consumer: " - + channel.getRemoteAddress() - + " --> provider: " - + channel.getLocalAddress()); - } - - @Override - public void received(Channel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - reply((ExchangeChannel) channel, message); - } else { - super.received(channel, message); - } - } - - }; - - @Override - public int getDefaultPort() { - return DEFAULT_PORT; - } - - @Override - public Exporter export(Invoker invoker) throws RpcException { - - // can use thrift codec only - URL url = invoker.getUrl().addParameter(CODEC_KEY, ThriftCodec.NAME); - // find server. - String key = url.getAddress(); - // client can expose a service for server to invoke only. - boolean isServer = url.getParameter(IS_SERVER_KEY, true); - if (isServer && !serverMap.containsKey(key)) { - serverMap.put(key, getServer(url)); - } - // export service. - key = serviceKey(url); - DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); - exporterMap.put(key, exporter); - - return exporter; - } - - @Override - public void destroy() { - - super.destroy(); - - for (String key : new ArrayList(serverMap.keySet())) { - - ProtocolServer protocolServer = serverMap.remove(key); - - if (protocolServer != null) { - RemotingServer server = protocolServer.getRemotingServer(); - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo server: " + server.getLocalAddress()); - } - server.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - } // ~ end of if ( server != null ) - - } // ~ end of loop serverMap - - } // ~ end of method destroy - - @Override - protected Invoker protocolBindingRefer(Class type, URL url) throws RpcException { - - ThriftInvoker invoker = new ThriftInvoker(type, url, getClients(url), invokers); - - invokers.add(invoker); - - return invoker; - - } - - private ExchangeClient[] getClients(URL url) { - - int connections = url.getParameter(CONNECTIONS_KEY, 1); - - ExchangeClient[] clients = new ExchangeClient[connections]; - - for (int i = 0; i < clients.length; i++) { - clients[i] = initClient(url); - } - return clients; - } - - private ExchangeClient initClient(URL url) { - - ExchangeClient client; - -// url = url.addParameter(CODEC_KEY, ThriftCodec.NAME); - - try { - client = Exchangers.connect(url); - } catch (RemotingException e) { - throw new RpcException("Fail to create remoting client for service(" + url - + "): " + e.getMessage(), e); - } - - return client; - - } - - private ProtocolServer getServer(URL url) { - // enable sending readonly event when server closes by default - url = url.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); - String str = url.getParameter(SERVER_KEY, org.apache.dubbo.rpc.Constants.DEFAULT_REMOTING_SERVER); - - if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { - throw new RpcException("Unsupported server type: " + str + ", url: " + url); - } - - ExchangeServer server; - try { - server = Exchangers.bind(url, handler); - } catch (RemotingException e) { - throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); - } - str = url.getParameter(CLIENT_KEY); - if (str != null && str.length() > 0) { - Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); - if (!supportedTypes.contains(str)) { - throw new RpcException("Unsupported client type: " + str); - } - } - return new ThriftProtocolServer(server); - } - - private class ThriftProtocolServer implements ProtocolServer { - - private ExchangeServer server; - private String address; - - public ThriftProtocolServer(ExchangeServer server) { - this.server = server; - } - - @Override - public RemotingServer getRemotingServer() { - return server; - } - - @Override - public String getAddress() { - return StringUtils.isNotEmpty(address) ? address : server.getUrl().getAddress(); - } - - @Override - public void setAddress(String address) { - this.address = address; - } - - @Override - public URL getUrl() { - return server.getUrl(); - } - - @Override - public void reset(URL url) { - server.reset(url); - } - - @Override - public void close() { - server.close(); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.thrift; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.RemotingException; +import org.apache.dubbo.remoting.RemotingServer; +import org.apache.dubbo.remoting.Transporter; +import org.apache.dubbo.remoting.exchange.ExchangeChannel; +import org.apache.dubbo.remoting.exchange.ExchangeClient; +import org.apache.dubbo.remoting.exchange.ExchangeHandler; +import org.apache.dubbo.remoting.exchange.ExchangeServer; +import org.apache.dubbo.remoting.exchange.Exchangers; +import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.ProtocolServer; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.protocol.AbstractProtocol; +import org.apache.dubbo.rpc.protocol.dubbo.DubboExporter; + +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; +import static org.apache.dubbo.remoting.Constants.CHANNEL_READONLYEVENT_SENT_KEY; +import static org.apache.dubbo.remoting.Constants.CLIENT_KEY; +import static org.apache.dubbo.remoting.Constants.CODEC_KEY; +import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY; +import static org.apache.dubbo.remoting.Constants.SERVER_KEY; +import static org.apache.dubbo.rpc.Constants.IS_SERVER_KEY; + +/** + * @since 2.7.0, use https://github.com/dubbo/dubbo-rpc-native-thrift instead + */ +@Deprecated +public class ThriftProtocol extends AbstractProtocol { + + public static final int DEFAULT_PORT = 40880; + + public static final String NAME = "thrift"; + + private ExchangeHandler handler = new ExchangeHandlerAdapter() { + + @Override + public CompletableFuture reply(ExchangeChannel channel, Object msg) throws RemotingException { + + if (msg instanceof Invocation) { + Invocation inv = (Invocation) msg; + String path = (String) inv.getObjectAttachments().get(PATH_KEY); + String serviceKey = serviceKey(channel.getLocalAddress().getPort(), + path, null, null); + DubboExporter exporter = (DubboExporter) exporterMap.getExport(serviceKey); + if (exporter == null) { + throw new RemotingException(channel, + "Not found exported service: " + + serviceKey + + " in " + + exporterMap.getExporterMap().keySet() + + ", may be version or group mismatch " + + ", channel: consumer: " + + channel.getRemoteAddress() + + " --> provider: " + + channel.getLocalAddress() + + ", message:" + msg); + } + + RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); + + Result result = exporter.getInvoker().invoke(inv); + return result.thenApply(Function.identity()); + } + + throw new RemotingException(channel, + "Unsupported request: " + + (msg.getClass().getName() + ": " + msg) + + ", channel: consumer: " + + channel.getRemoteAddress() + + " --> provider: " + + channel.getLocalAddress()); + } + + @Override + public void received(Channel channel, Object message) throws RemotingException { + if (message instanceof Invocation) { + reply((ExchangeChannel) channel, message); + } else { + super.received(channel, message); + } + } + + }; + + @Override + public int getDefaultPort() { + return DEFAULT_PORT; + } + + @Override + public Exporter export(Invoker invoker) throws RpcException { + + // can use thrift codec only + URL url = invoker.getUrl().addParameter(CODEC_KEY, ThriftCodec.NAME); + // find server. + String key = url.getAddress(); + // client can expose a service for server to invoke only. + boolean isServer = url.getParameter(IS_SERVER_KEY, true); + if (isServer && !serverMap.containsKey(key)) { + serverMap.put(key, getServer(url)); + } + // export service. + key = serviceKey(url); + DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); + exporterMap.addExportMap(key, exporter); + + return exporter; + } + + @Override + public void destroy() { + + super.destroy(); + + for (String key : new ArrayList(serverMap.keySet())) { + + ProtocolServer protocolServer = serverMap.remove(key); + + if (protocolServer != null) { + RemotingServer server = protocolServer.getRemotingServer(); + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo server: " + server.getLocalAddress()); + } + server.close(ConfigurationUtils.getServerShutdownTimeout()); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } // ~ end of if ( server != null ) + + } // ~ end of loop serverMap + + } // ~ end of method destroy + + @Override + protected Invoker protocolBindingRefer(Class type, URL url) throws RpcException { + + ThriftInvoker invoker = new ThriftInvoker(type, url, getClients(url), invokers); + + invokers.add(invoker); + + return invoker; + + } + + private ExchangeClient[] getClients(URL url) { + + int connections = url.getParameter(CONNECTIONS_KEY, 1); + + ExchangeClient[] clients = new ExchangeClient[connections]; + + for (int i = 0; i < clients.length; i++) { + clients[i] = initClient(url); + } + return clients; + } + + private ExchangeClient initClient(URL url) { + + ExchangeClient client; + +// url = url.addParameter(CODEC_KEY, ThriftCodec.NAME); + + try { + client = Exchangers.connect(url); + } catch (RemotingException e) { + throw new RpcException("Fail to create remoting client for service(" + url + + "): " + e.getMessage(), e); + } + + return client; + + } + + private ProtocolServer getServer(URL url) { + // enable sending readonly event when server closes by default + url = url.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); + String str = url.getParameter(SERVER_KEY, org.apache.dubbo.rpc.Constants.DEFAULT_REMOTING_SERVER); + + if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { + throw new RpcException("Unsupported server type: " + str + ", url: " + url); + } + + ExchangeServer server; + try { + server = Exchangers.bind(url, handler); + } catch (RemotingException e) { + throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); + } + str = url.getParameter(CLIENT_KEY); + if (str != null && str.length() > 0) { + Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); + if (!supportedTypes.contains(str)) { + throw new RpcException("Unsupported client type: " + str); + } + } + return new ThriftProtocolServer(server); + } + + private class ThriftProtocolServer implements ProtocolServer { + + private ExchangeServer server; + private String address; + + public ThriftProtocolServer(ExchangeServer server) { + this.server = server; + } + + @Override + public RemotingServer getRemotingServer() { + return server; + } + + @Override + public String getAddress() { + return StringUtils.isNotEmpty(address) ? address : server.getUrl().getAddress(); + } + + @Override + public void setAddress(String address) { + this.address = address; + } + + @Override + public URL getUrl() { + return server.getUrl(); + } + + @Override + public void reset(URL url) { + server.reset(url); + } + + @Override + public void close() { + server.close(); + } + } + +}