diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java index 4b5fd4b5f1e..1ea0462cc7a 100644 --- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java @@ -33,10 +33,10 @@ public static void main(String[] args) throws Exception { .application("dubbo-consumer-demo") .protocol(builder -> builder.port(20887).name("dubbo")) // Eureka - .registry(builder -> builder.address("eureka://127.0.0.1:8761?registry-type=service&subscribed-services=dubbo-provider-demo")) +// .registry(builder -> builder.address("eureka://127.0.0.1:8761?registry-type=service&subscribed-services=dubbo-provider-demo")) // Zookeeper - // .registry("zookeeper", builder -> builder.address("zookeeper://127.0.0.1:2181?registry-type=service&subscribed-services=dubbo-provider-demo")) + .registry("zookeeper", builder -> builder.address("zookeeper://127.0.0.1:2181?registry-type=service&subscribed-services=dubbo-provider-demo")) // .metadataReport(new MetadataReportConfig("zookeeper://127.0.0.1:2181")) // Nacos @@ -55,10 +55,16 @@ public static void main(String[] args) throws Exception { EchoService echoService = referenceConfig.get(); +// ReferenceConfig referenceConfig2 = configManager.getReference("user"); + +// UserService userService = referenceConfig2.get(); + for (int i = 0; i < 500; i++) { Thread.sleep(2000L); System.out.println(echoService.echo("Hello,World")); +// System.out.println(userService.getUser(1L)); } + } } diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java index 4e1958bb142..58760e5340b 100644 --- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderMinimumBootstrap.java @@ -16,6 +16,9 @@ */ package org.apache.dubbo.bootstrap; +import org.apache.dubbo.bootstrap.rest.UserService; +import org.apache.dubbo.bootstrap.rest.UserServiceImpl; + /** * TODO */ @@ -24,10 +27,11 @@ public class DubboServiceProviderMinimumBootstrap { public static void main(String[] args) { new DubboBootstrap() .application("dubbo-provider-demo") -// .registry(builder -> builder.address("zookeeper://127.0.0.1:2181?registry.type=service")) - .registry(builder -> builder.address("eureka://127.0.0.1:8761?registry-type=service")) + .registry(builder -> builder.address("zookeeper://127.0.0.1:2181?registry-type=service")) +// .registry(builder -> builder.address("eureka://127.0.0.1:8761?registry-type=service")) .protocol(builder -> builder.port(-1).name("dubbo")) - .service(builder -> builder.interfaceClass(EchoService.class).ref(new EchoServiceImpl())) + .service("echo", builder -> builder.interfaceClass(EchoService.class).ref(new EchoServiceImpl())) + .service("user", builder -> builder.interfaceClass(UserService.class).ref(new UserServiceImpl())) .start() .await(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index d30e59b84aa..98f3275e7ac 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -17,6 +17,7 @@ package org.apache.dubbo.registry.client; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.URLBuilder; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; @@ -36,13 +37,14 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.stream.Collectors; @@ -53,12 +55,15 @@ import static java.util.Collections.unmodifiableSet; import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; +import static org.apache.dubbo.common.URLBuilder.from; import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE; import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE; @@ -107,6 +112,13 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry { private final Set registeredListeners = new LinkedHashSet<>(); + /** + * A cache for all URLs of services that the subscribed services exported + * The key is the service name + * The value is a nested {@link Map} whose key is the revision and value is all URLs of services + */ + private final Map>> serviceExportedURLsCache = new LinkedHashMap<>(); + public ServiceDiscoveryRegistry(URL registryURL) { super(registryURL); this.serviceDiscovery = createServiceDiscovery(registryURL); @@ -328,8 +340,6 @@ protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String private List getSubscribedURLs(URL subscribedURL, Collection instances) { - List subscribedURLs = new LinkedList<>(); - // local service instances could be mutable List serviceInstances = instances.stream() .filter(ServiceInstance::isEnabled) @@ -337,78 +347,107 @@ private List getSubscribedURLs(URL subscribedURL, Collection> revisionURLsCache = new HashMap<>(); + int size = serviceInstances.size(); - // try to get the exported URLs from every instance until it's successful. - for (int i = 0; i < serviceInstances.size(); i++) { - // select a instance of {@link ServiceInstance} - ServiceInstance selectedInstance = selectServiceInstance(serviceInstances); - List templateURLs = getTemplateURLs(subscribedURL, selectedInstance, revisionURLsCache); - if (isNotEmpty(templateURLs)) { - // add templateURLs into subscribedURLs - subscribedURLs.addAll(templateURLs); - // remove the selected ServiceInstance in this time, it remains N - 1 elements. - serviceInstances.remove(selectedInstance); - break; - } + if (size == 0) { + return emptyList(); } + expungeStaleRevisionExportedURLs(serviceInstances); + + initTemplateURLs(subscribedURL, serviceInstances); + // Clone the subscribed URLs from the template URLs - List clonedURLs = cloneSubscribedURLs(subscribedURL, serviceInstances, revisionURLsCache); - // Add all cloned URLs into subscribedURLs - subscribedURLs.addAll(clonedURLs); - // clear all revisions - revisionURLsCache.clear(); + List subscribedURLs = cloneSubscribedURLs(subscribedURL, serviceInstances); // clear local service instances serviceInstances.clear(); - return subscribedURLs; } - private List cloneSubscribedURLs(URL subscribedURL, Collection serviceInstances, - Map> revisionURLsCache) { + private void initTemplateURLs(URL subscribedURL, List serviceInstances) { + // Try to get the template URLs until success + for (int i = 0; i < serviceInstances.size(); i++) { + // select a instance of {@link ServiceInstance} + ServiceInstance selectedInstance = selectServiceInstance(serviceInstances); + // try to get the template URLs + List templateURLs = getTemplateURLs(subscribedURL, selectedInstance); + if (isNotEmpty(templateURLs)) { // If the result is valid + break; // break the loop + } else { + serviceInstances.remove(selectedInstance); // remove if the service instance is not available + // There may be one or more service instances from the "serviceInstances" + } + } + } - // If revisionURLsCache is not empty, clone the template URLs to be the subscribed URLs - if (!revisionURLsCache.isEmpty()) { + private void expungeStaleRevisionExportedURLs(List serviceInstances) { - List clonedURLs = new LinkedList<>(); + if (isEmpty(serviceInstances)) { // if empty, return immediately + return; + } - Iterator iterator = serviceInstances.iterator(); + String serviceName = serviceInstances.get(0).getServiceName(); - while (iterator.hasNext()) { + synchronized (this) { - ServiceInstance serviceInstance = iterator.next(); + // revisionExportedURLs is mutable + Map> revisionExportedURLs = serviceExportedURLsCache.computeIfAbsent(serviceName, s -> new HashMap<>()); - List templateURLs = getTemplateURLs(subscribedURL, serviceInstance, revisionURLsCache); - // The parameters of URLs that the MetadataService exported - Map> serviceURLsParams = getMetadataServiceURLsParams(serviceInstance); + if (revisionExportedURLs.isEmpty()) { // if empty, return immediately + return; + } - templateURLs.forEach(templateURL -> { + Set existedRevisions = revisionExportedURLs.keySet(); // read-only + Set currentRevisions = serviceInstances.stream() + .map(ServiceInstanceMetadataUtils::getExportedServicesRevision) + .collect(Collectors.toSet()); + // staleRevisions = existedRevisions(copy) - currentRevisions + Set staleRevisions = new HashSet<>(existedRevisions); + staleRevisions.removeAll(currentRevisions); + // remove exported URLs if staled + staleRevisions.forEach(revisionExportedURLs::remove); + } + } - String protocol = templateURL.getProtocol(); + private List cloneSubscribedURLs(URL subscribedURL, Collection serviceInstances) { - Map serviceURLParams = serviceURLsParams.get(protocol); + if (isEmpty(serviceInstances)) { + return emptyList(); + } - String host = getProviderHost(serviceURLParams); + List clonedURLs = new LinkedList<>(); - Integer port = getProviderPort(serviceURLParams); + serviceInstances.forEach(serviceInstance -> { - /** - * clone the subscribed {@link URL urls} based on the template {@link URL url} - */ - URL newSubscribedURL = new URL(protocol, host, port, templateURL.getParameters()); - clonedURLs.add(newSubscribedURL); - }); - } + List templateURLs = getTemplateURLs(subscribedURL, serviceInstance); + // The parameters of URLs that the MetadataService exported + Map> serviceURLsParams = getMetadataServiceURLsParams(serviceInstance); - return clonedURLs; - } + serviceURLsParams.forEach((protocol, parametersMap) -> { + templateURLs.stream() + .filter(templateURL -> isCompatibleProtocol(protocol, templateURL)) + .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY)) + .map(templateURL -> templateURL.removeParameter(PID_KEY)) + .map(templateURL -> { + + String host = getProviderHost(parametersMap); + Integer port = getProviderPort(parametersMap); - return Collections.emptyList(); + if (Objects.equals(templateURL.getHost(), host) + && Objects.equals(templateURL.getPort(), port)) { // use templateURL if equals + return templateURL; + } + + URLBuilder clonedURLBuilder = from(templateURL) // remove the parameters from the template URL + .setHost(host) // reset the host + .setPort(port); // reset the port + + return clonedURLBuilder.build(); + }) + .forEach(clonedURLs::add); + }); + }); + return clonedURLs; } @@ -419,74 +458,187 @@ private List cloneSubscribedURLs(URL subscribedURL, Collectionnull if serviceInstances is empty. */ private ServiceInstance selectServiceInstance(List serviceInstances) { + int size = serviceInstances.size(); + if (size == 0) { + return null; + } else if (size == 1) { + return serviceInstances.get(0); + } ServiceInstanceSelector selector = getExtensionLoader(ServiceInstanceSelector.class).getAdaptiveExtension(); return selector.select(getUrl(), serviceInstances); } - /** * Get the template {@link URL urls} from the specified {@link ServiceInstance}. *

- * Typically, the revisions of all {@link ServiceInstance instances} in one service are same. However, - * if one service is upgrading one or more Dubbo service interfaces, one of them may have the multiple declarations - * is deploying in the different {@link ServiceInstance service instances}, thus, it has to compare the interface - * contract one by one, the "revision" that is the number is introduced to identify all Dubbo exported interfaces in - * one {@link ServiceInstance service instance}. - *

* First, put the revision {@link ServiceInstance service instance} - * associating {@link #getProviderExportedURLs(URL, ServiceInstance) exported URLs} into cache. + * associating {@link #getExportedURLs(URL, ServiceInstance) exported URLs} into cache. *

* And then compare a new {@link ServiceInstance service instances'} revision with cached one,If they are equal, * return the cached template {@link URL urls} immediately, or to get template {@link URL urls} that the provider - * {@link ServiceInstance instance} exported via executing {@link #getProviderExportedURLs(URL, ServiceInstance)} + * {@link ServiceInstance instance} exported via executing {@link #getExportedURLs(URL, ServiceInstance)} * method. *

* Eventually, the retrieving result will be cached and returned. * - * @param subscribedURL the subscribed {@link URL url} - * @param selectedInstance the {@link ServiceInstance} - * @param revisionURLsCache A caches all revisions of exported services in different {@link ServiceInstance}s - * associating with the {@link URL urls} + * @param subscribedURL the subscribed {@link URL url} + * @param selectedInstance the {@link ServiceInstance} + * associating with the {@link URL urls} * @return non-null {@link List} of {@link URL urls} */ - protected List getTemplateURLs(URL subscribedURL, ServiceInstance selectedInstance, - Map> revisionURLsCache) { + protected List getTemplateURLs(URL subscribedURL, ServiceInstance selectedInstance) { + + List exportedURLs = getRevisionExportedURLs(selectedInstance); + + if (isEmpty(exportedURLs)) { + return emptyList(); + } + + return filterSubscribedURLs(subscribedURL, exportedURLs); + } + + private List filterSubscribedURLs(URL subscribedURL, List exportedURLs) { + return exportedURLs.stream() + .filter(url -> isSameServiceInterface(subscribedURL, url)) + .filter(url -> isSameParameter(subscribedURL, url, VERSION_KEY)) + .filter(url -> isSameParameter(subscribedURL, url, GROUP_KEY)) + .filter(url -> isCompatibleProtocol(subscribedURL, url)) + .collect(Collectors.toList()); + } + + private boolean isSameServiceInterface(URL one, URL another) { + return Objects.equals(one.getServiceInterface(), another.getServiceInterface()); + } + + private boolean isSameParameter(URL one, URL another, String key) { + return Objects.equals(one.getParameter(key), another.getParameter(key)); + } + + private boolean isCompatibleProtocol(URL one, URL another) { + String protocol = one.getParameter(PROTOCOL_KEY); + return isCompatibleProtocol(protocol, another); + } + + private boolean isCompatibleProtocol(String protocol, URL targetURL) { + return protocol == null || Objects.equals(protocol, targetURL.getParameter(PROTOCOL_KEY)) + || Objects.equals(protocol, targetURL.getProtocol()); + } + + /** + * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported with cache + *

+ * Typically, the revisions of all {@link ServiceInstance instances} in one service are same. However, + * if one service is upgrading one or more Dubbo service interfaces, one of them may have the multiple declarations + * is deploying in the different {@link ServiceInstance service instances}, thus, it has to compare the interface + * contract one by one, the "revision" that is the number is introduced to identify all Dubbo exported interfaces in + * one {@link ServiceInstance service instance}. + * + * @param providerServiceInstance the {@link ServiceInstance} provides the Dubbo Services + * @return the same as {@link #getExportedURLs(ServiceInstance)} + */ + private List getRevisionExportedURLs(ServiceInstance providerServiceInstance) { + + if (providerServiceInstance == null) { + return emptyList(); + } + + String serviceName = providerServiceInstance.getServiceName(); // get the revision from the specified {@link ServiceInstance} - String revision = getExportedServicesRevision(selectedInstance); - // try to get templateURLs from cache - List templateURLs = revisionURLsCache.get(revision); - - if (isEmpty(templateURLs)) { // not exists or getting failed last time - - if (!revisionURLsCache.isEmpty()) { // it's not first time - if (logger.isWarnEnabled()) { - logger.warn(format("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s" + - ", please make sure the service [name : %s] is changing or not.", - selectedInstance.getId(), - selectedInstance.getHost(), - selectedInstance.getPort(), - revision, - selectedInstance.getServiceName() - )); + String revision = getExportedServicesRevision(providerServiceInstance); + + List exportedURLs = null; + + synchronized (this) { // It's required to lock here because it may run in the sync or async mode + + Map> exportedURLsMap = serviceExportedURLsCache.computeIfAbsent(serviceName, s -> new LinkedHashMap()); + + exportedURLs = exportedURLsMap.get(revision); + + boolean firstGet = false; + + if (exportedURLs == null) { // The hit is missing in cache + + if (!exportedURLsMap.isEmpty()) { // The case is that current ServiceInstance with the different revision + if (logger.isWarnEnabled()) { + logger.warn(format("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s" + + ", please make sure the service [name : %s] is changing or not.", + providerServiceInstance.getId(), + providerServiceInstance.getHost(), + providerServiceInstance.getPort(), + revision, + providerServiceInstance.getServiceName() + )); + } + } else {// Else, it's the first time to get the exported URLs + firstGet = true; + } + exportedURLs = getExportedURLs(providerServiceInstance); + + if (exportedURLs != null) { // just allow the valid result into exportedURLsMap + + exportedURLsMap.put(revision, exportedURLs); + + if (logger.isDebugEnabled()) { + logger.debug(format("Getting the exported URLs[size : %s, first : %s] from the target service " + + "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]", + exportedURLs.size(), firstGet, + providerServiceInstance.getId(), + providerServiceInstance.getServiceName(), + providerServiceInstance.getHost(), + providerServiceInstance.getPort(), + revision + )); + } } } - // get or get again - templateURLs = getProviderExportedURLs(subscribedURL, selectedInstance); - // put into cache - revisionURLsCache.put(revision, templateURLs); } - return templateURLs; + // Get a copy from source in order to prevent the caller trying to change the cached data + return exportedURLs != null ? new ArrayList<>(exportedURLs) : null; + } + + /** + * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported + * from {@link MetadataService} proxy + * + * @param providerServiceInstance the {@link ServiceInstance} provides the Dubbo Services + * @return The possible result : + *

    + *
  1. The normal result
  2. + *
  3. The empty result if the {@link ServiceInstance service instance} did not export yet
  4. + *
  5. null if there is an invocation error on {@link MetadataService} proxy
  6. + *
+ */ + private List getExportedURLs(ServiceInstance providerServiceInstance) { + + List exportedURLs = null; + + String metadataStorageType = getMetadataStorageType(providerServiceInstance); + + try { + MetadataService metadataService = MetadataServiceProxyFactory + .getExtension(metadataStorageType == null ? DEFAULT_EXTENSION : metadataStorageType) + .getProxy(providerServiceInstance); + SortedSet urls = metadataService.getExportedURLs(); + exportedURLs = urls.stream().map(URL::valueOf).collect(Collectors.toList()); + } catch (Throwable e) { + if (logger.isErrorEnabled()) { + logger.error(format("It's failed to get the exported URLs from the target service instance[%s]", + providerServiceInstance), e); + } + exportedURLs = null; // set the result to be null if failed to get + } + return exportedURLs; } /** * Get the exported {@link URL urls} from the specified provider {@link ServiceInstance instance} * - * @param subscribedURL the subscribed {@link URL url} - * @param providerInstance the target provider {@link ServiceInstance instance} + * @param subscribedURL the subscribed {@link URL url} + * @param providerServiceInstance the target provider {@link ServiceInstance instance} * @return non-null {@link List} of {@link URL urls} */ - protected List getProviderExportedURLs(URL subscribedURL, ServiceInstance providerInstance) { + protected List getExportedURLs(URL subscribedURL, ServiceInstance providerServiceInstance) { List exportedURLs = emptyList(); @@ -495,12 +647,12 @@ protected List getProviderExportedURLs(URL subscribedURL, ServiceInstance p String version = subscribedURL.getParameter(VERSION_KEY); // The subscribed protocol may be null String protocol = subscribedURL.getParameter(PROTOCOL_KEY); - String metadataStorageType = getMetadataStorageType(providerInstance); + String metadataStorageType = getMetadataStorageType(providerServiceInstance); try { MetadataService metadataService = MetadataServiceProxyFactory .getExtension(metadataStorageType == null ? DEFAULT_EXTENSION : metadataStorageType) - .getProxy(providerInstance); + .getProxy(providerServiceInstance); SortedSet urls = metadataService.getExportedURLs(serviceInterface, group, version, protocol); exportedURLs = urls.stream().map(URL::valueOf).collect(Collectors.toList()); } catch (Throwable e) { @@ -562,4 +714,4 @@ public static ServiceDiscoveryRegistry create(URL registryURL) { public static boolean supports(URL registryURL) { return SERVICE_REGISTRY_TYPE.equalsIgnoreCase(registryURL.getParameter(REGISTRY_TYPE_KEY)); } -} +} \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/selector/RandomServiceInstanceSelector.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/selector/RandomServiceInstanceSelector.java index ff5030469e5..605575f01cc 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/selector/RandomServiceInstanceSelector.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/selector/RandomServiceInstanceSelector.java @@ -37,7 +37,7 @@ public ServiceInstance select(URL registryURL, List serviceInst if (size < 1) { return null; } - int index = selectIndexRandomly(size); + int index = size == 1 ? 0 : selectIndexRandomly(size); return serviceInstances.get(index); }