Skip to content

Commit

Permalink
Revert "[2.7] Fix Nacos aggregate listen (apache#10467)"
Browse files Browse the repository at this point in the history
This reverts commit 8b580d0.
  • Loading branch information
hujun-w-2 committed Nov 28, 2022
1 parent b4386f9 commit 59d4f94
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 109 deletions.

This file was deleted.

Expand Up @@ -34,23 +34,24 @@
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Collections;

import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
Expand Down Expand Up @@ -123,9 +124,7 @@ public class NacosRegistry extends FailbackRegistry {

private final NacosNamingServiceWrapper namingService;

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, NacosAggregateListener>> originToAggregateListener = new ConcurrentHashMap<>();

private final ConcurrentMap<URL, ConcurrentMap<NacosAggregateListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();

public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
super(url);
Expand Down Expand Up @@ -180,23 +179,22 @@ public void doUnregister(final URL url) {

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
NacosAggregateListener nacosAggregateListener = new NacosAggregateListener(listener);
originToAggregateListener.computeIfAbsent(url, k -> new ConcurrentHashMap<>()).put(listener, nacosAggregateListener);

Set<String> serviceNames = getServiceNames(url, nacosAggregateListener);
Set<String> serviceNames = getServiceNames(url, listener);

//Set corresponding serviceNames for easy search later
if (isServiceNamesWithCompatibleMode(url)) {
for (String serviceName : serviceNames) {
NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);
}
}
doSubscribe(url, nacosAggregateListener, serviceNames);

doSubscribe(url, listener, serviceNames);
}

private void doSubscribe(final URL url, final NacosAggregateListener listener, final Set<String> serviceNames) {
private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
execute(namingService -> {
if (isServiceNamesWithCompatibleMode(url)) {
List<Instance> allCorrespondingInstanceList = Lists.newArrayList();

/**
* Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
Expand All @@ -211,8 +209,9 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
notifySubscriber(url, serviceName, listener, instances);
allCorrespondingInstanceList.addAll(instances);
}
notifySubscriber(url, listener, allCorrespondingInstanceList);
for (String serviceName : serviceNames) {
subscribeEventListener(serviceName, url, listener);
}
Expand All @@ -221,7 +220,7 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f
for (String serviceName : serviceNames) {
instances.addAll(namingService.getAllInstances(serviceName
, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)));
notifySubscriber(url, serviceName, listener, instances);
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
}
Expand All @@ -244,24 +243,18 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) {
public void doUnsubscribe(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
} else {
Map<NotifyListener, NacosAggregateListener> listenerMap = originToAggregateListener.get(url);
NacosAggregateListener nacosAggregateListener = listenerMap.remove(listener);
if (nacosAggregateListener != null) {
Set<String> serviceNames = getServiceNames(url, nacosAggregateListener);
}
else {
Set<String> serviceNames = getServiceNames(url, listener);

doUnsubscribe(url, nacosAggregateListener, serviceNames);
}
if (listenerMap.isEmpty()) {
originToAggregateListener.remove(url);
}
doUnsubscribe(url, listener, serviceNames);
}
}

private void doUnsubscribe(final URL url, final NacosAggregateListener nacosAggregateListener, final Set<String> serviceNames) {
private void doUnsubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
execute(namingService -> {
for (String serviceName : serviceNames) {
unsubscribeEventListener(serviceName, url, nacosAggregateListener);
unsubscribeEventListener(serviceName, url, listener);
}
});
}
Expand All @@ -279,7 +272,7 @@ private void shutdownServiceNamesLookup() {
* @param listener {@link NotifyListener}
* @return non-null
*/
private Set<String> getServiceNames(URL url, NacosAggregateListener listener) {
private Set<String> getServiceNames(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
scheduleServiceNamesLookup(url, listener);
return getServiceNamesForOps(url);
Expand Down Expand Up @@ -313,7 +306,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {
Set<String> serviceNames = new LinkedHashSet<>();

execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
.stream()
.filter(this::isConformRules)
.map(NacosServiceName::new)
Expand Down Expand Up @@ -364,7 +357,7 @@ private boolean isAdminProtocol(URL url) {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}

private void scheduleServiceNamesLookup(final URL url, final NacosAggregateListener listener) {
private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
Expand Down Expand Up @@ -517,9 +510,9 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
return urls;
}

private void subscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener)
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
ConcurrentMap<NacosAggregateListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
EventListener nacosListener = listeners.computeIfAbsent(listener, k -> {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
Expand All @@ -535,7 +528,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Nac
instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
}

notifySubscriber(url, serviceName, listener, instances);
notifySubscriber(url, listener, instances);
}
};
return eventListener;
Expand All @@ -545,14 +538,14 @@ private void subscribeEventListener(String serviceName, final URL url, final Nac
nacosListener);
}

private void unsubscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener)
private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
ConcurrentMap<NacosAggregateListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if (notifyListenerEventListenerConcurrentMap == null) {
ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if(notifyListenerEventListenerConcurrentMap == null){
return;
}
EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener);
if (nacosListener == null) {
if(nacosListener == null){
return;
}
namingService.unsubscribe(serviceName,
Expand All @@ -567,14 +560,14 @@ private void unsubscribeEventListener(String serviceName, final URL url, final N
* @param listener {@link NotifyListener}
* @param instances all {@link Instance instances}
*/
private void notifySubscriber(URL url, String serviceName, NacosAggregateListener listener, Collection<Instance> instances) {
private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
List<Instance> enabledInstances = new LinkedList<>(instances);
if (enabledInstances.size() > 0) {
// Instances
filterEnabledInstances(enabledInstances);
}
List<URL> aggregatedUrls = toUrlWithEmpty(url, listener.saveAndAggregateAllInstances(serviceName, enabledInstances));
NacosRegistry.this.notify(url, listener.getNotifyListener(), aggregatedUrls);
List<URL> urls = toUrlWithEmpty(url, enabledInstances);
NacosRegistry.this.notify(url, listener, urls);
}

/**
Expand Down

0 comments on commit 59d4f94

Please sign in to comment.