Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-6720] fix bug same interface unexport and export fail. also support hotload service #6720

Merged
merged 14 commits into from Jun 15, 2021
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Exporter;
Expand All @@ -42,7 +43,7 @@
/**
* abstract ProtocolSupport.
*/
public abstract class AbstractProtocol implements Protocol {
public abstract class AbstractProtocol implements DelegateExporterMap, Protocol {
owen200008 marked this conversation as resolved.
Show resolved Hide resolved

protected final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -110,7 +111,34 @@ public Map<String, Exporter<?>> getExporterMap() {
return exporterMap;
}

@Override
public Collection<Exporter<?>> getExporters() {
return Collections.unmodifiableCollection(exporterMap.values());
}

@Override
public boolean isEmpty() {
return CollectionUtils.isEmptyMap(exporterMap);
}

@Override
public Exporter<?> getExport(String key) {
return exporterMap.get(key);
}

@Override
public void addExportMap(String key, Exporter<?> exporter) {
exporterMap.put(key, exporter);
}

/**
* mgr exporterMap self, don't give to DubboExporter or InjvmExporter
*/
@Override
public void removeExportMap(String key, Exporter<?> exporter) {
Exporter<?> findExporter = exporterMap.get(key);
if(findExporter == exporter){
exporterMap.remove(key);
}
}
}
Expand Up @@ -77,7 +77,7 @@ public void setProxyFactory(ProxyFactory proxyFactory) {
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
Exporter<T> exporter = (Exporter<T>) getExport(uri);
if (exporter != null) {
// When modifying the configuration through override, you need to re-expose the newly modified service.
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
Expand All @@ -88,7 +88,7 @@ public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
exporter = new AbstractExporter<T>(invoker) {
@Override
public void afterUnExport() {
exporterMap.remove(uri);
removeExportMap(uri, this);
if (runnable != null) {
try {
runnable.run();
Expand All @@ -98,7 +98,7 @@ public void afterUnExport() {
}
}
};
exporterMap.put(uri, exporter);
addExportMap(uri, exporter);
return exporter;
}

Expand Down Expand Up @@ -277,5 +277,4 @@ public boolean isClosed() {
}
}


}
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol;

import org.apache.dubbo.rpc.Exporter;
import java.util.Collection;

/**
* delegate exportermap oper
*/
public interface DelegateExporterMap {

/**
* check is empty map
* @return
*/
boolean isEmpty();

/**
* get export
* @param key
* @return
*/
Exporter<?> getExport(String key);

/**
* add
* @param key
* @param exporter
*/
void addExportMap(String key, Exporter<?> exporter);

/**
* delete
* @param key
*/
void removeExportMap(String key, Exporter<?> exporter);

/**
* get all exports
* @return
*/
Collection<Exporter<?>> getExporters();

}
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo;

import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.protocol.AbstractExporter;

import java.util.Map;
import org.apache.dubbo.rpc.protocol.DelegateExporterMap;

/**
* DubboExporter
Expand All @@ -29,17 +27,16 @@ public class DubboExporter<T> extends AbstractExporter<T> {

private final String key;

private final Map<String, Exporter<?>> exporterMap;
private final DelegateExporterMap delegateExporterMap;

public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
public DubboExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
this.delegateExporterMap = delegateExporterMap;
}

@Override
public void afterUnExport() {
exporterMap.remove(key);
delegateExporterMap.removeExportMap(key, this);
}

}
Expand Up @@ -259,7 +259,7 @@ Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException
(String) inv.getObjectAttachments().get(VERSION_KEY),
(String) inv.getObjectAttachments().get(GROUP_KEY)
);
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
DubboExporter<?> exporter = (DubboExporter<?>) getExport(serviceKey);

if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
Expand All @@ -284,8 +284,8 @@ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, this);
addExportMap(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Expand Down
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.dubbo.rpc.protocol.injvm;

import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.protocol.AbstractExporter;

import java.util.Map;
import org.apache.dubbo.rpc.protocol.DelegateExporterMap;

/**
* InjvmExporter
Expand All @@ -29,18 +27,18 @@ class InjvmExporter<T> extends AbstractExporter<T> {

private final String key;

private final Map<String, Exporter<?>> exporterMap;
private final DelegateExporterMap delegateExporterMap;

InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
InjvmExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
this.delegateExporterMap = delegateExporterMap;
}

@Override
public void afterUnExport() {
exporterMap.remove(key);
delegateExporterMap.removeExportMap(key, this);
}

}

Expand Up @@ -24,8 +24,7 @@
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;

import java.util.Map;
import org.apache.dubbo.rpc.protocol.DelegateExporterMap;

import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;

Expand All @@ -36,17 +35,17 @@ class InjvmInvoker<T> extends AbstractInvoker<T> {

private final String key;

private final Map<String, Exporter<?>> exporterMap;
private final DelegateExporterMap delegateExporterMap;

InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
InjvmInvoker(Class<T> type, URL url, String key, DelegateExporterMap delegateExporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
this.delegateExporterMap = delegateExporterMap;
}

@Override
public boolean isAvailable() {
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
InjvmExporter<?> exporter = (InjvmExporter<?>) delegateExporterMap.getExport(key);
if (exporter == null) {
return false;
} else {
Expand All @@ -56,7 +55,7 @@ public boolean isAvailable() {

@Override
public Result doInvoke(Invocation invocation) throws Throwable {
Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
Exporter<?> exporter = InjvmProtocol.getExporter(delegateExporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
Expand Down
Expand Up @@ -18,17 +18,15 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.protocol.DelegateExporterMap;
import org.apache.dubbo.rpc.support.ProtocolUtils;

import java.util.Map;

import static org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER;
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
Expand Down Expand Up @@ -58,14 +56,14 @@ public static InjvmProtocol getInjvmProtocol() {
return INSTANCE;
}

static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
static Exporter<?> getExporter(DelegateExporterMap delegateExporterMap, URL key) {
Exporter<?> result = null;

if (!key.getServiceKey().contains("*")) {
result = map.get(key.getServiceKey());
result = delegateExporterMap.getExport(key.getServiceKey());
} else {
if (CollectionUtils.isNotEmptyMap(map)) {
for (Exporter<?> exporter : map.values()) {
if (!delegateExporterMap.isEmpty()) {
for (Exporter<?> exporter : delegateExporterMap.getExporters()) {
if (UrlUtils.isServiceKeyMatch(key, exporter.getInvoker().getUrl())) {
result = exporter;
break;
Expand All @@ -91,12 +89,15 @@ public int getDefaultPort() {

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
String serviceKey = invoker.getUrl().getServiceKey();
InjvmExporter<T> tInjvmExporter = new InjvmExporter<>(invoker, serviceKey, this);
addExportMap(serviceKey, tInjvmExporter);
return tInjvmExporter;
}

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), this);
}

public boolean isInjvmRefer(URL url) {
Expand All @@ -112,7 +113,7 @@ public boolean isInjvmRefer(URL url) {
} else if (url.getParameter(GENERIC_KEY, false)) {
// generic invocation is not local reference
return false;
} else if (getExporter(exporterMap, url) != null) {
} else if (getExporter(this, url) != null) {
// Broadcast cluster means that multiple machines will be called,
// which is not converted to injvm protocol at this time.
if (BROADCAST_CLUSTER.equalsIgnoreCase(url.getParameter(CLUSTER_KEY))) {
Expand Down
Expand Up @@ -24,10 +24,12 @@
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;

import org.apache.dubbo.rpc.protocol.DelegateExporterMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

Expand Down Expand Up @@ -76,7 +78,32 @@ public void testLocalProtocol() throws Exception {
assertEquals(service.getSize(new String[]{"", "", ""}), 3);
service.invoke("injvm://127.0.0.1/TestService", "invoke");

InjvmInvoker injvmInvoker = new InjvmInvoker(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap<String, Exporter<?>>());
InjvmInvoker injvmInvoker = new InjvmInvoker(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new DelegateExporterMap() {
@Override
public boolean isEmpty() {
return true;
}

@Override
public Exporter<?> getExport(String key) {
return null;
}

@Override
public void addExportMap(String key, Exporter<?> exporter) {

}

@Override
public void removeExportMap(String key, Exporter<?> exporter) {

}

@Override
public Collection<Exporter<?>> getExporters() {
return null;
}
});
assertFalse(injvmInvoker.isAvailable());

}
Expand Down