diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java index 2bd774c85fd..f7cf9514193 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java @@ -103,5 +103,4 @@ public interface Constants { * prefix of arguments router key */ String ARGUMENTS = "arguments"; - } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java index 638af54bd73..b936f6ba2d5 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java @@ -93,4 +93,8 @@ public interface Constants { String REGISTRY_RETRY_PERIOD_KEY = "retry.period"; String SESSION_TIMEOUT_KEY = "session"; + + String NEED_REEXPORT = "need-reexport"; + + String ENABLE_CONFIGURATION_LISTEN = "enable-configuration-listen"; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/OverrideInstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/OverrideInstanceAddressURL.java new file mode 100644 index 00000000000..d260b9ae346 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/OverrideInstanceAddressURL.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.client; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.metadata.MetadataInfo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class OverrideInstanceAddressURL extends InstanceAddressURL { + private static final long serialVersionUID = 6015101839312074851L; + + private final Map overrideParams; + private final InstanceAddressURL originUrl; + + private final transient Map>> methodNumberCache = new ConcurrentHashMap<>(); + private volatile transient Map> methodNumbers; + private final transient Map> serviceNumberCache = new ConcurrentHashMap<>(); + private volatile transient Map numbers; + + public OverrideInstanceAddressURL(InstanceAddressURL originUrl) { + this.originUrl = originUrl; + this.overrideParams = new HashMap<>(); + } + + public OverrideInstanceAddressURL(InstanceAddressURL originUrl, Map overrideParams) { + this.originUrl = originUrl; + this.overrideParams = overrideParams; + } + + @Override + public ServiceInstance getInstance() { + return originUrl.getInstance(); + } + + @Override + public MetadataInfo getMetadataInfo() { + return originUrl.getMetadataInfo(); + } + + @Override + public String getServiceInterface() { + return originUrl.getServiceInterface(); + } + + @Override + public String getGroup() { + return originUrl.getGroup(); + } + + @Override + public String getVersion() { + return originUrl.getVersion(); + } + + @Override + public String getProtocol() { + return originUrl.getProtocol(); + } + + @Override + public String getProtocolServiceKey() { + return originUrl.getProtocolServiceKey(); + } + + @Override + public String getServiceKey() { + return originUrl.getServiceKey(); + } + + @Override + public String getAddress() { + return originUrl.getAddress(); + } + + @Override + public String getHost() { + return originUrl.getHost(); + } + + @Override + public int getPort() { + return originUrl.getPort(); + } + + @Override + public String getIp() { + return originUrl.getIp(); + } + + @Override + public String getPath() { + return originUrl.getPath(); + } + + @Override + public String getParameter(String key) { + String overrideParam = overrideParams.get(key); + return StringUtils.isNotEmpty(overrideParam) ? overrideParam : originUrl.getParameter(key); + } + + @Override + public String getServiceParameter(String service, String key) { + String overrideParam = overrideParams.get(key); + return StringUtils.isNotEmpty(overrideParam) ? overrideParam : originUrl.getServiceParameter(service, key); + } + + @Override + public String getServiceMethodParameter(String protocolServiceKey, String method, String key) { + String overrideParam = overrideParams.get(method + "." + key); + return StringUtils.isNotEmpty(overrideParam) ? + overrideParam : + originUrl.getServiceMethodParameter(protocolServiceKey, method, key); + } + + @Override + public String getMethodParameter(String method, String key) { + String overrideParam = overrideParams.get(method + "." + key); + return StringUtils.isNotEmpty(overrideParam) ? + overrideParam : + originUrl.getMethodParameter(method, key); + } + + @Override + public boolean hasServiceMethodParameter(String protocolServiceKey, String method, String key) { + return StringUtils.isNotEmpty(overrideParams.get(method + "." + key)) || + originUrl.hasServiceMethodParameter(protocolServiceKey, method, key); + } + + @Override + public boolean hasMethodParameter(String method, String key) { + return StringUtils.isNotEmpty(overrideParams.get(method + "." + key)) || + originUrl.hasMethodParameter(method, key); + } + + @Override + public boolean hasServiceMethodParameter(String protocolServiceKey, String method) { + return overrideParams.keySet().stream().anyMatch((e)->e.startsWith(method) || originUrl.hasServiceMethodParameter(protocolServiceKey, method)); + } + + @Override + public boolean hasMethodParameter(String method) { + return overrideParams.keySet().stream().anyMatch((e)->e.startsWith(method) || originUrl.hasMethodParameter(method)); + } + + @Override + public Map getServiceParameters(String protocolServiceKey) { + Map parameters = originUrl.getServiceParameters(protocolServiceKey); + Map overrideParameters = overrideParams; + Map result = new HashMap<>((int) (parameters.size() + overrideParameters.size() / 0.75f) + 1); + result.putAll(parameters); + result.putAll(overrideParameters); + return result; + } + + @Override + public Map getParameters() { + Map parameters = originUrl.getParameters(); + Map overrideParameters = overrideParams; + Map result = new HashMap<>((int) (parameters.size() + overrideParameters.size() / 0.75f) + 1); + result.putAll(parameters); + result.putAll(overrideParameters); + return result; + } + + @Override + public URL addParameter(String key, String value) { + HashMap map = new HashMap<>(overrideParams); + map.put(key, value); + return new OverrideInstanceAddressURL(originUrl, map); + } + + @Override + public URL addParameterIfAbsent(String key, String value) { + HashMap map = new HashMap<>(overrideParams); + map.putIfAbsent(key, value); + return new OverrideInstanceAddressURL(originUrl, map); + } + + @Override + public URL addServiceParameter(String protocolServiceKey, String key, String value) { + return originUrl.addServiceParameter(protocolServiceKey, key, value); + } + + @Override + public URL addServiceParameterIfAbsent(String protocolServiceKey, String key, String value) { + return originUrl.addServiceParameterIfAbsent(protocolServiceKey, key, value); + } + + @Override + public URL addConsumerParams(String protocolServiceKey, Map params) { + return originUrl.addConsumerParams(protocolServiceKey, params); + } + + @Override + public URL addParameters(Map parameters) { + HashMap map = new HashMap<>(overrideParams); + map.putAll(parameters); + return new OverrideInstanceAddressURL(originUrl, map); + } + + @Override + public URL addParametersIfAbsent(Map parameters) { + HashMap map = new HashMap<>(overrideParams); + parameters.forEach(map::putIfAbsent); + return new OverrideInstanceAddressURL(originUrl, map); + } + + @Override + protected Map getServiceNumbers(String protocolServiceKey) { + return serviceNumberCache.computeIfAbsent(protocolServiceKey, (k) -> new ConcurrentHashMap<>()); + } + + @Override + protected Map getNumbers() { + if (numbers == null) { // concurrent initialization is tolerant + numbers = new ConcurrentHashMap<>(); + } + return numbers; + } + + @Override + protected Map> getServiceMethodNumbers(String protocolServiceKey) { + return methodNumberCache.computeIfAbsent(protocolServiceKey, (k) -> new ConcurrentHashMap<>()); + } + + @Override + protected Map> getMethodNumbers() { + if (methodNumbers == null) { // concurrent initialization is tolerant + methodNumbers = new ConcurrentHashMap<>(); + } + return methodNumbers; + } + + public Map getOverrideParams() { + return overrideParams; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + OverrideInstanceAddressURL that = (OverrideInstanceAddressURL) o; + return Objects.equals(overrideParams, that.overrideParams) && Objects.equals(originUrl, that.originUrl); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), overrideParams, originUrl); + } + + @Override + public String toString() { + return originUrl.toString() + ", overrideParams: " + overrideParams.toString(); + } + +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index a7f968a0f52..9051b2be82e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -17,6 +17,7 @@ package org.apache.dubbo.registry.client; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; @@ -24,11 +25,15 @@ import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.registry.AddressListener; +import org.apache.dubbo.registry.Constants; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; +import org.apache.dubbo.registry.integration.AbstractConfiguratorListener; import org.apache.dubbo.registry.integration.DynamicDirectory; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.cluster.Configurator; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.ArrayList; import java.util.Collection; @@ -40,12 +45,18 @@ import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY; import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; +import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX; public class ServiceDiscoveryRegistryDirectory extends DynamicDirectory { private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class); // instance address to invoker mapping. private volatile Map> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference + private final static ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener(); + private volatile ReferenceConfigurationListener referenceConfigurationListener; + private volatile boolean enableConfigurationListen = true; + private volatile List originalUrls = null; // initial for null + private volatile Map overrideQueryMap; private ServiceInstancesChangedListener listener; @@ -53,6 +64,28 @@ public ServiceDiscoveryRegistryDirectory(Class serviceType, URL url) { super(serviceType, url); } + @Override + public void subscribe(URL url) { + super.subscribe(url); + if (ApplicationModel.getEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) { + enableConfigurationListen = true; + CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); + referenceConfigurationListener = new ReferenceConfigurationListener(this, url); + } else { + enableConfigurationListen = false; + } + } + + @Override + public void unSubscribe(URL url) { + super.unSubscribe(url); + this.originalUrls = null; + if (ApplicationModel.getEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) { + CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this); + referenceConfigurationListener.stop(); + } + } + @Override public boolean isAvailable() { if (isDestroyed()) { @@ -85,11 +118,72 @@ public synchronized void notify(List instanceUrls) { } } + refreshOverrideAndInvoker(instanceUrls); + } + + // RefreshOverrideAndInvoker will be executed by registryCenter and configCenter, so it should be synchronized. + private synchronized void refreshOverrideAndInvoker(List instanceUrls) { + // mock zookeeper://xxx?mock=return null + if (enableConfigurationListen) { + overrideDirectoryUrl(); + } refreshInvoker(instanceUrls); } + // TODO: exact + private void overrideDirectoryUrl() { + // merge override parameters + this.overrideDirectoryUrl = directoryUrl; + List localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference + doOverrideUrl(localAppDynamicConfigurators); + if (referenceConfigurationListener != null) { + List localDynamicConfigurators = referenceConfigurationListener.getConfigurators(); // local reference + doOverrideUrl(localDynamicConfigurators); + } + } + + private void doOverrideUrl(List configurators) { + if (CollectionUtils.isNotEmpty(configurators)) { + for (Configurator configurator : configurators) { + this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); + Map newParams = new HashMap<>(this.overrideDirectoryUrl.getParameters()); + directoryUrl.getParameters().forEach(newParams::remove); + this.overrideQueryMap = newParams; + } + } + } + + private InstanceAddressURL overrideWithConfigurator(InstanceAddressURL providerUrl) { + // override url with configurator from configurator from "app-name.configurators" + providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl); + + // override url with configurator from configurators from "service-name.configurators" + if (referenceConfigurationListener != null) { + providerUrl = overrideWithConfigurators(referenceConfigurationListener.getConfigurators(), providerUrl); + } + + return providerUrl; + } + + private InstanceAddressURL overrideWithConfigurators(List configurators, InstanceAddressURL url) { + if (CollectionUtils.isNotEmpty(configurators)) { + // wrap url + OverrideInstanceAddressURL overrideInstanceAddressURL = new OverrideInstanceAddressURL(url); + if (overrideQueryMap != null) { + // override app-level configs + overrideInstanceAddressURL = (OverrideInstanceAddressURL) overrideInstanceAddressURL.addParameters(overrideQueryMap); + } + for (Configurator configurator : configurators) { + overrideInstanceAddressURL = (OverrideInstanceAddressURL) configurator.configure(overrideInstanceAddressURL); + } + return overrideInstanceAddressURL; + } + return url; + } + private void refreshInvoker(List invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address."); + this.originalUrls = invokerUrls; if (invokerUrls.size() == 0) { this.forbidden = true; // Forbid to access @@ -158,6 +252,11 @@ private Map> toInvokers(List urls) { // FIXME, some keys may need to be removed. instanceAddressURL.addConsumerParams(getConsumerUrl().getProtocolServiceKey(), queryMap); + // Override provider urls if needed + if (enableConfigurationListen) { + instanceAddressURL = overrideWithConfigurator(instanceAddressURL); + } + Invoker invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress()); if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again try { @@ -190,6 +289,17 @@ private boolean urlChanged(Invoker invoker, InstanceAddressURL newURL) { return true; } + if (oldURL instanceof OverrideInstanceAddressURL || newURL instanceof OverrideInstanceAddressURL) { + if(!(oldURL instanceof OverrideInstanceAddressURL && newURL instanceof OverrideInstanceAddressURL)) { + // sub-class changed + return true; + } else { + if (!((OverrideInstanceAddressURL) oldURL).getOverrideParams().equals(((OverrideInstanceAddressURL) newURL).getOverrideParams())) { + return true; + } + } + } + return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()) .equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())); } @@ -261,4 +371,63 @@ private void destroyUnusedInvokers(Map> oldUrlInvokerMap, Map } } } + + private static class ReferenceConfigurationListener extends AbstractConfiguratorListener { + private final ServiceDiscoveryRegistryDirectory directory; + private final URL url; + + ReferenceConfigurationListener(ServiceDiscoveryRegistryDirectory directory, URL url) { + this.directory = directory; + this.url = url; + this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); + } + + void stop() { + this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); + } + + @Override + protected void notifyOverrides() { + // to notify configurator/router changes + if (directory.originalUrls != null) { + URL backup = RpcContext.getContext().getConsumerUrl(); + RpcContext.getContext().setConsumerUrl(directory.getConsumerUrl()); + directory.refreshOverrideAndInvoker(directory.originalUrls); + RpcContext.getContext().setConsumerUrl(backup); + } + } + } + + private static class ConsumerConfigurationListener extends AbstractConfiguratorListener { + private final List> listeners = new ArrayList<>(); + + ConsumerConfigurationListener() { + } + + void addNotifyListener(ServiceDiscoveryRegistryDirectory listener) { + if (listeners.size() == 0) { + this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); + } + this.listeners.add(listener); + } + + void removeNotifyListener(ServiceDiscoveryRegistryDirectory listener) { + this.listeners.remove(listener); + if (listeners.size() == 0) { + this.stopListen(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); + } + } + + @Override + protected void notifyOverrides() { + listeners.forEach(listener -> { + if (listener.originalUrls != null) { + URL backup = RpcContext.getContext().getConsumerUrl(); + RpcContext.getContext().setConsumerUrl(listener.getConsumerUrl()); + listener.refreshOverrideAndInvoker(listener.originalUrls); + RpcContext.getContext().setConsumerUrl(backup); + } + }); + } + } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java index 6d7f476e522..987aba7d602 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java @@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.common.utils.UrlUtils; +import org.apache.dubbo.registry.Constants; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.Registry; import org.apache.dubbo.registry.RegistryFactory; @@ -694,7 +695,9 @@ public synchronized void doOverrideIfNecessary() { newUrl = getConfiguredInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()) .getConfigurators(), newUrl); if (!currentUrl.equals(newUrl)) { - RegistryProtocol.this.reExport(originInvoker, newUrl); + if(newUrl.getParameter(Constants.NEED_REEXPORT, true)) { + RegistryProtocol.this.reExport(originInvoker, newUrl); + } LOGGER.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); }