diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index 774117b228f..a51e77fe95e 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -143,6 +143,13 @@ compile true + + org.apache.dubbo + dubbo-remoting-zookeeper-curator5 + ${project.version} + compile + true + org.apache.dubbo dubbo-rpc-api @@ -662,6 +669,7 @@ org.apache.dubbo:dubbo-remoting-redis org.apache.dubbo:dubbo-remoting-http org.apache.dubbo:dubbo-remoting-zookeeper + org.apache.dubbo:dubbo-remoting-zookeeper-curator5 org.apache.dubbo:dubbo-rpc-api org.apache.dubbo:dubbo-rpc-dubbo org.apache.dubbo:dubbo-rpc-injvm diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index 235108f1d26..4ba5d170d85 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -173,6 +173,11 @@ dubbo-remoting-zookeeper ${project.version} + + org.apache.dubbo + dubbo-remoting-zookeeper-curator5 + ${project.version} + org.apache.dubbo dubbo-rpc-api diff --git a/dubbo-compatible/pom.xml b/dubbo-compatible/pom.xml index abb337e13b5..4297a82a87c 100644 --- a/dubbo-compatible/pom.xml +++ b/dubbo-compatible/pom.xml @@ -78,6 +78,11 @@ dubbo-remoting-zookeeper ${project.parent.version} + + org.apache.dubbo + dubbo-remoting-zookeeper-curator5 + ${project.parent.version} + org.apache.dubbo dubbo-rpc-rest diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml index c06de98f0ed..8fd6b836e4e 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml @@ -38,6 +38,11 @@ dubbo-remoting-zookeeper ${project.parent.version} + + org.apache.dubbo + dubbo-remoting-zookeeper-curator5 + ${project.parent.version} + org.apache.curator curator-test diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java index d5882d93125..6dcedd98ebf 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; +import org.apache.dubbo.common.extension.DisableInject; import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; /** @@ -28,6 +29,11 @@ public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigu private ZookeeperTransporter zookeeperTransporter; + public ZookeeperDynamicConfigurationFactory() { + this.zookeeperTransporter = ZookeeperTransporter.getExtension(); + } + + @DisableInject public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } diff --git a/dubbo-metadata/dubbo-metadata-report-zookeeper/src/main/java/org/apache/dubbo/metadata/store/zookeeper/ZookeeperMetadataReportFactory.java b/dubbo-metadata/dubbo-metadata-report-zookeeper/src/main/java/org/apache/dubbo/metadata/store/zookeeper/ZookeeperMetadataReportFactory.java index 0ffed8db8e9..ee3e2d97a56 100644 --- a/dubbo-metadata/dubbo-metadata-report-zookeeper/src/main/java/org/apache/dubbo/metadata/store/zookeeper/ZookeeperMetadataReportFactory.java +++ b/dubbo-metadata/dubbo-metadata-report-zookeeper/src/main/java/org/apache/dubbo/metadata/store/zookeeper/ZookeeperMetadataReportFactory.java @@ -17,6 +17,7 @@ package org.apache.dubbo.metadata.store.zookeeper; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.DisableInject; import org.apache.dubbo.metadata.report.MetadataReport; import org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory; import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; @@ -28,6 +29,11 @@ public class ZookeeperMetadataReportFactory extends AbstractMetadataReportFactor private ZookeeperTransporter zookeeperTransporter; + public ZookeeperMetadataReportFactory() { + this.zookeeperTransporter = ZookeeperTransporter.getExtension(); + } + + @DisableInject public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } diff --git a/dubbo-registry/dubbo-registry-zookeeper/pom.xml b/dubbo-registry/dubbo-registry-zookeeper/pom.xml index 2d810dbfdf7..81cdbc5cffd 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/pom.xml +++ b/dubbo-registry/dubbo-registry-zookeeper/pom.xml @@ -40,6 +40,11 @@ dubbo-remoting-zookeeper ${project.parent.version} + + org.apache.dubbo + dubbo-remoting-zookeeper-curator5 + ${project.parent.version} + org.apache.curator curator-x-discovery @@ -50,4 +55,4 @@ test - \ No newline at end of file + diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryFactory.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryFactory.java index d702d0184cb..0308d73ab8c 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryFactory.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryFactory.java @@ -17,22 +17,28 @@ package org.apache.dubbo.registry.zookeeper; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.DisableInject; import org.apache.dubbo.registry.Registry; import org.apache.dubbo.registry.support.AbstractRegistryFactory; import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; /** * ZookeeperRegistryFactory. - * */ public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; + public ZookeeperRegistryFactory() { + this.zookeeperTransporter = ZookeeperTransporter.getExtension(); + } + /** * Invisible injection of zookeeper client via IOC/SPI + * * @param zookeeperTransporter */ + @DisableInject public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } diff --git a/dubbo-remoting/dubbo-remoting-api/pom.xml b/dubbo-remoting/dubbo-remoting-api/pom.xml index 69ed09499be..9b2eb007cf5 100644 --- a/dubbo-remoting/dubbo-remoting-api/pom.xml +++ b/dubbo-remoting/dubbo-remoting-api/pom.xml @@ -47,4 +47,4 @@ test - \ No newline at end of file + diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperClient.java similarity index 93% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperClient.java index 8ee4b4bbff4..3b1e21c773d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperClient.java @@ -14,16 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.zookeeper.support; +package org.apache.dubbo.remoting.zookeeper; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.remoting.zookeeper.ChildListener; -import org.apache.dubbo.remoting.zookeeper.DataListener; -import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; import java.util.List; import java.util.Set; diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java similarity index 94% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporter.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java index 821f4c5ea16..699b2c97217 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporter.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/AbstractZookeeperTransporter.java @@ -14,15 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.zookeeper.support; +package org.apache.dubbo.remoting.zookeeper; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.RemotingConstants; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; import java.util.ArrayList; import java.util.Collections; @@ -90,7 +88,7 @@ public ZookeeperClient connect(URL url) { * @param addressList * @return */ - ZookeeperClient fetchAndUpdateZookeeperClientCache(List addressList) { + public ZookeeperClient fetchAndUpdateZookeeperClientCache(List addressList) { ZookeeperClient zookeeperClient = null; for (String address : addressList) { @@ -110,7 +108,7 @@ ZookeeperClient fetchAndUpdateZookeeperClientCache(List addressList) { * @param url such as:zookeeper://127.0.0.1:2181?127.0.0.1:8989,127.0.0.1:9999 * @return such as 127.0.0.1:2181,127.0.0.1:8989,127.0.0.1:9999 */ - List getURLBackupAddress(URL url) { + public List getURLBackupAddress(URL url) { List addressList = new ArrayList(); addressList.add(url.getAddress()); addressList.addAll(url.getParameter(RemotingConstants.BACKUP_KEY, Collections.EMPTY_LIST)); @@ -176,7 +174,7 @@ URL toClientURL(URL url) { * * @return */ - Map getZookeeperClientMap() { + public Map getZookeeperClientMap() { return zookeeperClientMap; } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ChildListener.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ChildListener.java similarity index 100% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ChildListener.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ChildListener.java diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java similarity index 100% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java similarity index 66% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java index a1de0373652..19af805edc2 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java @@ -16,8 +16,6 @@ */ package org.apache.dubbo.remoting.zookeeper; -import org.apache.zookeeper.Watcher; - /** * 2019-02-26 */ @@ -32,8 +30,6 @@ public enum EventType { CONNECTION_LOST(12), INITIALIZED(10); - - private final int intValue; // Integer representation of value // for sending over wire @@ -45,21 +41,4 @@ public int getIntValue() { return intValue; } - public static Watcher.Event.EventType fromInt(int intValue) { - switch (intValue) { - case -1: - return Watcher.Event.EventType.None; - case 1: - return Watcher.Event.EventType.NodeCreated; - case 2: - return Watcher.Event.EventType.NodeDeleted; - case 3: - return Watcher.Event.EventType.NodeDataChanged; - case 4: - return Watcher.Event.EventType.NodeChildrenChanged; - - default: - throw new RuntimeException("Invalid integer value for conversion to EventType"); - } - } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/StateListener.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/StateListener.java similarity index 100% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/StateListener.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/StateListener.java diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java similarity index 100% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java new file mode 100644 index 00000000000..2b360f1452e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.extension.SPI; + +import static org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader; + +@SPI +public interface ZookeeperTransporter { + + String CURATOR_5 = "curator5"; + + String CURATOR = "curator"; + + ZookeeperClient connect(URL url); + + static ZookeeperTransporter getExtension() { + ExtensionLoader extensionLoader = getExtensionLoader(ZookeeperTransporter.class); + boolean isHighVersion = isHighVersionCurator(); + if (isHighVersion) { + return extensionLoader.getExtension(CURATOR_5); + } + return extensionLoader.getExtension(CURATOR); + } + + static boolean isHighVersionCurator() { + try { + Class.forName("org.apache.curator.framework.recipes.cache.CuratorCache"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/pom.xml new file mode 100644 index 00000000000..a6568b55daa --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/pom.xml @@ -0,0 +1,67 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-remoting + ${revision} + ../pom.xml + + dubbo-remoting-zookeeper-curator5 + jar + ${project.artifactId} + The zookeeper curator5 remoting module of dubbo project + + false + 5.0.0 + 3.6.0 + + + + org.apache.dubbo + dubbo-remoting-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-common + ${project.parent.version} + + + org.apache.curator + curator-recipes + ${curator5_version} + + + org.apache.curator + curator-framework + ${curator5_version} + + + org.apache.zookeeper + zookeeper + ${zookeeper_version} + + + org.apache.curator + curator-test + ${curator5_version} + test + + + diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClient.java new file mode 100644 index 00000000000..abcb4ec45d5 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClient.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper.curator5; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperClient; +import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; +import org.apache.dubbo.remoting.zookeeper.StateListener; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; + +public class Curator5ZookeeperClient extends AbstractZookeeperClient { + + protected static final Logger logger = LoggerFactory.getLogger(Curator5ZookeeperClient.class); + private static final String ZK_SESSION_EXPIRE_KEY = "zk.session.expire"; + + static final Charset CHARSET = StandardCharsets.UTF_8; + private final CuratorFramework client; + private static Map nodeCacheMap = new ConcurrentHashMap<>(); + + public Curator5ZookeeperClient(URL url) { + super(url); + try { + int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS); + int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(url.getBackupAddress()) + .retryPolicy(new RetryNTimes(1, 1000)) + .connectionTimeoutMs(timeout) + .sessionTimeoutMs(sessionExpireMs); + String authority = url.getAuthority(); + if (authority != null && authority.length() > 0) { + builder = builder.authorization("digest", authority.getBytes()); + } + client = builder.build(); + client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url)); + client.start(); + boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS); + if (!connected) { + throw new IllegalStateException("zookeeper not connected"); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void createPersistent(String path) { + try { + client.create().forPath(path); + } catch (NodeExistsException e) { + logger.warn("ZNode " + path + " already exists.", e); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void createEphemeral(String path) { + try { + client.create().withMode(CreateMode.EPHEMERAL).forPath(path); + } catch (NodeExistsException e) { + logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" + + ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" + + " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " + + "we can just try to delete and create again.", e); + deletePath(path); + createEphemeral(path); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + protected void createPersistent(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + client.create().forPath(path, dataBytes); + } catch (NodeExistsException e) { + try { + client.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + protected void createEphemeral(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes); + } catch (NodeExistsException e) { + logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" + + ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" + + " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " + + "we can just try to delete and create again.", e); + deletePath(path); + createEphemeral(path, data); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + protected void deletePath(String path) { + try { + client.delete().deletingChildrenIfNeeded().forPath(path); + } catch (NoNodeException ignored) { + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public List getChildren(String path) { + try { + return client.getChildren().forPath(path); + } catch (NoNodeException e) { + return null; + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public boolean checkExists(String path) { + try { + if (client.checkExists().forPath(path) != null) { + return true; + } + } catch (Exception e) { + } + return false; + } + + @Override + public boolean isConnected() { + return client.getZookeeperClient().isConnected(); + } + + @Override + public String doGetContent(String path) { + try { + byte[] dataBytes = client.getData().forPath(path); + return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET); + } catch (NoNodeException e) { + // ignore NoNode Exception. + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + return null; + } + + @Override + public void doClose() { + client.close(); + } + + @Override + public Curator5ZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) { + return new Curator5ZookeeperClient.CuratorWatcherImpl(client, listener, path); + } + + @Override + protected List addTargetChildListener(String path, CuratorWatcherImpl listener) { + try { + return client.getChildren().usingWatcher(listener).forPath(path); + } catch (NoNodeException e) { + return null; + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + protected Curator5ZookeeperClient.NodeCacheListenerImpl createTargetDataListener(String path, DataListener listener) { + return new NodeCacheListenerImpl(client, listener, path); + } + + @Override + protected void addTargetDataListener(String path, Curator5ZookeeperClient.NodeCacheListenerImpl nodeCacheListener) { + this.addTargetDataListener(path, nodeCacheListener, null); + } + + @Override + protected void addTargetDataListener(String path, Curator5ZookeeperClient.NodeCacheListenerImpl nodeCacheListener, Executor executor) { + try { + NodeCache nodeCache = new NodeCache(client, path); + if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) { + return; + } + if (executor == null) { + nodeCache.getListenable().addListener(nodeCacheListener); + } else { + nodeCache.getListenable().addListener(nodeCacheListener, executor); + } + + nodeCache.start(); + } catch (Exception e) { + throw new IllegalStateException("Add nodeCache listener for path:" + path, e); + } + } + + @Override + protected void removeTargetDataListener(String path, Curator5ZookeeperClient.NodeCacheListenerImpl nodeCacheListener) { + NodeCache nodeCache = nodeCacheMap.get(path); + if (nodeCache != null) { + nodeCache.getListenable().removeListener(nodeCacheListener); + } + nodeCacheListener.dataListener = null; + } + + @Override + public void removeTargetChildListener(String path, CuratorWatcherImpl listener) { + listener.unwatch(); + } + + static class NodeCacheListenerImpl implements NodeCacheListener { + + private CuratorFramework client; + + private volatile DataListener dataListener; + + private String path; + + protected NodeCacheListenerImpl() { + } + + public NodeCacheListenerImpl(CuratorFramework client, DataListener dataListener, String path) { + this.client = client; + this.dataListener = dataListener; + this.path = path; + } + + @Override + public void nodeChanged() throws Exception { + ChildData childData = nodeCacheMap.get(path).getCurrentData(); + String content = null; + EventType eventType; + if (childData == null) { + eventType = EventType.NodeDeleted; + } else { + content = new String(childData.getData(), CHARSET); + eventType = EventType.NodeDataChanged; + } + dataListener.dataChanged(path, content, eventType); + } + } + + static class CuratorWatcherImpl implements CuratorWatcher { + + private CuratorFramework client; + private volatile ChildListener childListener; + private String path; + + public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) { + this.client = client; + this.childListener = listener; + this.path = path; + } + + protected CuratorWatcherImpl() { + } + + public void unwatch() { + this.childListener = null; + } + + @Override + public void process(WatchedEvent event) throws Exception { + // if client connect or disconnect to server, zookeeper will queue + // watched event(Watcher.Event.EventType.None, .., path = null). + if (event.getType() == Watcher.Event.EventType.None) { + return; + } + + if (childListener != null) { + childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path)); + } + } + } + + private class CuratorConnectionStateListener implements ConnectionStateListener { + private final long UNKNOWN_SESSION_ID = -1L; + + private long lastSessionId; + private int timeout; + private int sessionExpireMs; + + public CuratorConnectionStateListener(URL url) { + this.timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS); + this.sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState state) { + long sessionId = UNKNOWN_SESSION_ID; + try { + sessionId = client.getZookeeperClient().getZooKeeper().getSessionId(); + } catch (Exception e) { + logger.warn("Curator client state changed, but failed to get the related zk session instance."); + } + + if (state == ConnectionState.LOST) { + logger.warn("Curator zookeeper session " + Long.toHexString(lastSessionId) + " expired."); + Curator5ZookeeperClient.this.stateChanged(StateListener.SESSION_LOST); + } else if (state == ConnectionState.SUSPENDED) { + logger.warn("Curator zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. " + + "connection timeout value is " + timeout + ", session expire timeout value is " + sessionExpireMs); + Curator5ZookeeperClient.this.stateChanged(StateListener.SUSPENDED); + } else if (state == ConnectionState.CONNECTED) { + lastSessionId = sessionId; + logger.info("Curator zookeeper client instance initiated successfully, session id is " + Long.toHexString(sessionId)); + Curator5ZookeeperClient.this.stateChanged(StateListener.CONNECTED); + } else if (state == ConnectionState.RECONNECTED) { + if (lastSessionId == sessionId && sessionId != UNKNOWN_SESSION_ID) { + logger.warn("Curator zookeeper connection recovered from connection lose, " + + "reuse the old session " + Long.toHexString(sessionId)); + Curator5ZookeeperClient.this.stateChanged(StateListener.RECONNECTED); + } else { + logger.warn("New session created after old session lost, " + + "old session " + Long.toHexString(lastSessionId) + ", new session " + Long.toHexString(sessionId)); + lastSessionId = sessionId; + Curator5ZookeeperClient.this.stateChanged(StateListener.NEW_SESSION_CREATED); + } + } + } + + } + + /** + * just for unit test + * + * @return + */ + public CuratorFramework getClient() { + return client; + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporter.java similarity index 68% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java rename to dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporter.java index 638f3ed29f6..ba29a59801d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperTransporter.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporter.java @@ -14,17 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.zookeeper; +package org.apache.dubbo.remoting.zookeeper.curator5; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.extension.Adaptive; -import org.apache.dubbo.common.extension.SPI; -import org.apache.dubbo.remoting.Constants; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter; +import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -@SPI("curator") -public interface ZookeeperTransporter { +public class Curator5ZookeeperTransporter extends AbstractZookeeperTransporter { + @Override + public ZookeeperClient createZookeeperClient(URL url) { + return new Curator5ZookeeperClient(url); + } - @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) - ZookeeperClient connect(URL url); } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter new file mode 100644 index 00000000000..5c2f7b85d7e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter @@ -0,0 +1 @@ +curator5=org.apache.dubbo.remoting.zookeeper.curator5.Curator5ZookeeperTransporter diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClientTest.java new file mode 100644 index 00000000000..33e4154d864 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClientTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper.curator5; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.curator5.Curator5ZookeeperClient; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.WatchedEvent; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; + +public class Curator5ZookeeperClientTest { + private TestingServer zkServer; + private Curator5ZookeeperClient curatorClient; + CuratorFramework client = null; + + @BeforeEach + public void setUp() throws Exception { + int zkServerPort = NetUtils.getAvailablePort(); + zkServer = new TestingServer(zkServerPort, true); + curatorClient = new Curator5ZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + + zkServerPort + "/org.apache.dubbo.registry.RegistryService")); + client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + client.start(); + } + + @Test + public void testCheckExists() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + curatorClient.create(path, false); + assertThat(curatorClient.checkExists(path), is(true)); + assertThat(curatorClient.checkExists(path + "/noneexits"), is(false)); + } + + @Test + public void testChildrenPath() { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + curatorClient.create(path, false); + curatorClient.create(path + "/provider1", false); + curatorClient.create(path + "/provider2", false); + + List children = curatorClient.getChildren(path); + assertThat(children.size(), is(2)); + } + + @Test + public void testChildrenListener() throws InterruptedException { + String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; + curatorClient.create(path, false); + final CountDownLatch countDownLatch = new CountDownLatch(1); + curatorClient.addTargetChildListener(path, new Curator5ZookeeperClient.CuratorWatcherImpl() { + + @Override + public void process(WatchedEvent watchedEvent) throws Exception { + countDownLatch.countDown(); + } + }); + curatorClient.createPersistent(path + "/provider1"); + countDownLatch.await(); + } + + + @Test + public void testWithInvalidServer() { + Assertions.assertThrows(IllegalStateException.class, () -> { + curatorClient = new Curator5ZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:1/service")); + curatorClient.create("/testPath", true); + }); + } + + @Test + public void testWithStoppedServer() throws IOException { + Assertions.assertThrows(IllegalStateException.class, () -> { + curatorClient.create("/testPath", true); + zkServer.stop(); + curatorClient.delete("/testPath"); + }); + } + + @Test + public void testRemoveChildrenListener() { + ChildListener childListener = mock(ChildListener.class); + curatorClient.addChildListener("/children", childListener); + curatorClient.removeChildListener("/children", childListener); + } + + @Test + public void testCreateExistingPath() { + curatorClient.create("/pathOne", false); + curatorClient.create("/pathOne", false); + } + + @Test + public void testConnectedStatus() { + curatorClient.createEphemeral("/testPath"); + boolean connected = curatorClient.isConnected(); + assertThat(connected, is(true)); + } + + @Test + public void testCreateContent4Persistent() { + String path = "/curatorTest4CrContent/content.data"; + String content = "createContentTest"; + curatorClient.delete(path); + assertThat(curatorClient.checkExists(path), is(false)); + assertNull(curatorClient.getContent(path)); + + curatorClient.create(path, content, false); + assertThat(curatorClient.checkExists(path), is(true)); + assertEquals(curatorClient.getContent(path), content); + } + + @Test + public void testCreateContent4Temp() { + String path = "/curatorTest4CrContent/content.data"; + String content = "createContentTest"; + curatorClient.delete(path); + assertThat(curatorClient.checkExists(path), is(false)); + assertNull(curatorClient.getContent(path)); + + curatorClient.create(path, content, true); + assertThat(curatorClient.checkExists(path), is(true)); + assertEquals(curatorClient.getContent(path), content); + } + + @AfterEach + public void tearDown() throws Exception { + curatorClient.close(); + zkServer.stop(); + } + + @Test + public void testAddTargetDataListener() throws Exception { + String listenerPath = "/dubbo/service.name/configuration"; + String path = listenerPath + "/dat/data"; + String value = "vav"; + + curatorClient.create(path + "/d.json", value, true); + String valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertEquals(value, valueFromCache); + final AtomicInteger atomicInteger = new AtomicInteger(0); + curatorClient.addTargetDataListener(path + "/d.json", new Curator5ZookeeperClient.NodeCacheListenerImpl() { + + @Override + public void nodeChanged() throws Exception { + atomicInteger.incrementAndGet(); + } + }); + + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNotNull(valueFromCache); + + Thread.sleep(100); + curatorClient.getClient().setData().forPath(path + "/d.json", "foo".getBytes()); + Thread.sleep(100); + curatorClient.getClient().setData().forPath(path + "/d.json", "bar".getBytes()); + curatorClient.delete(path + "/d.json"); + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNull(valueFromCache); + Thread.sleep(2000L); + Assertions.assertTrue(3L <= atomicInteger.get()); + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporterTest.java new file mode 100644 index 00000000000..44c8b7b9d84 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperTransporterTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper.curator5; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; + +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.nullValue; + +public class Curator5ZookeeperTransporterTest { + private TestingServer zkServer; + private ZookeeperClient zookeeperClient; + private Curator5ZookeeperTransporter curatorZookeeperTransporter; + private int zkServerPort; + + @BeforeEach + public void setUp() throws Exception { + zkServerPort = NetUtils.getAvailablePort(); + zkServer = new TestingServer(zkServerPort, true); + zookeeperClient = new Curator5ZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" + + zkServerPort + "/service")); + curatorZookeeperTransporter = new Curator5ZookeeperTransporter(); + } + + @Test + public void testZookeeperClient() { + assertThat(zookeeperClient, not(nullValue())); + zookeeperClient.close(); + } + + @AfterEach + public void tearDown() throws Exception { + zkServer.stop(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/support/AbstractZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/support/AbstractZookeeperTransporterTest.java new file mode 100644 index 00000000000..5bb45cfd265 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/test/java/org/apache/dubbo/remoting/zookeeper/curator5/support/AbstractZookeeperTransporterTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper.curator5.support; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter; +import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; +import org.apache.dubbo.remoting.zookeeper.curator5.Curator5ZookeeperTransporter; + +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.nullValue; + +/** + * AbstractZookeeperTransporterTest + */ +public class AbstractZookeeperTransporterTest { + private TestingServer zkServer; + private ZookeeperClient zookeeperClient; + private AbstractZookeeperTransporter abstractZookeeperTransporter; + private int zkServerPort; + + @BeforeEach + public void setUp() throws Exception { + zkServerPort = NetUtils.getAvailablePort(); + zkServer = new TestingServer(zkServerPort, true); + zookeeperClient = new Curator5ZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" + + zkServerPort + "/service")); + abstractZookeeperTransporter = new Curator5ZookeeperTransporter(); + } + + + @AfterEach + public void tearDown() throws Exception { + zkServer.stop(); + } + + @Test + public void testZookeeperClient() { + assertThat(zookeeperClient, not(nullValue())); + zookeeperClient.close(); + } + + @Test + public void testGetURLBackupAddress() { + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + 9099 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + List stringList = abstractZookeeperTransporter.getURLBackupAddress(url); + Assertions.assertEquals(stringList.size(), 2); + Assertions.assertEquals(stringList.get(0), "127.0.0.1:" + zkServerPort); + Assertions.assertEquals(stringList.get(1), "127.0.0.1:9099"); + } + + @Test + public void testGetURLBackupAddressNoBack() { + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + List stringList = abstractZookeeperTransporter.getURLBackupAddress(url); + Assertions.assertEquals(stringList.size(), 1); + Assertions.assertEquals(stringList.get(0), "127.0.0.1:" + zkServerPort); + } + + @Test + public void testFetchAndUpdateZookeeperClientCache() throws Exception { + int zkServerPort2 = NetUtils.getAvailablePort(); + TestingServer zkServer2 = new TestingServer(zkServerPort2, true); + + int zkServerPort3 = NetUtils.getAvailablePort(); + TestingServer zkServer3 = new TestingServer(zkServerPort3, true); + + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort3 + ",127.0.0.1:" + zkServerPort2 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url); + //just for connected + newZookeeperClient.getContent("/dubbo/test"); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + + URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + checkFetchAndUpdateCacheNotNull(url2); + URL url3 = URL.valueOf("zookeeper://127.0.0.1:8778/org.apache.dubbo.metadata.store.MetadataReport?backup=127.0.0.1:" + zkServerPort3 + "&address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + checkFetchAndUpdateCacheNotNull(url3); + + zkServer2.stop(); + zkServer3.stop(); + } + + private void checkFetchAndUpdateCacheNotNull(URL url) { + List addressList = abstractZookeeperTransporter.getURLBackupAddress(url); + ZookeeperClient zookeeperClient = abstractZookeeperTransporter.fetchAndUpdateZookeeperClientCache(addressList); + Assertions.assertNotNull(zookeeperClient); + } + + @Test + public void testRepeatConnect() { + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url); + //just for connected + newZookeeperClient.getContent("/dubbo/test"); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + Assertions.assertTrue(newZookeeperClient.isConnected()); + + ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2); + //just for connected + newZookeeperClient2.getContent("/dubbo/test"); + Assertions.assertEquals(newZookeeperClient, newZookeeperClient2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + } + + @Test + public void testNotRepeatConnect() throws Exception { + int zkServerPort2 = NetUtils.getAvailablePort(); + TestingServer zkServer2 = new TestingServer(zkServerPort2, true); + + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url); + //just for connected + newZookeeperClient.getContent("/dubbo/test"); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + + ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2); + //just for connected + newZookeeperClient2.getContent("/dubbo/test"); + Assertions.assertNotEquals(newZookeeperClient, newZookeeperClient2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2); + + zkServer2.stop(); + } + + @Test + public void testRepeatConnectForBackUpAdd() throws Exception { + int zkServerPort2 = NetUtils.getAvailablePort(); + TestingServer zkServer2 = new TestingServer(zkServerPort2, true); + + int zkServerPort3 = NetUtils.getAvailablePort(); + TestingServer zkServer3 = new TestingServer(zkServerPort3, true); + + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort2 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?backup=127.0.0.1:" + zkServerPort3 + "&address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url); + //just for connected + newZookeeperClient.getContent("/dubbo/test"); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + + ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2); + //just for connected + newZookeeperClient2.getContent("/dubbo/test"); + Assertions.assertEquals(newZookeeperClient, newZookeeperClient2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2); + + zkServer2.stop(); + zkServer3.stop(); + } + + @Test + public void testRepeatConnectForNoMatchBackUpAdd() throws Exception { + int zkServerPort2 = NetUtils.getAvailablePort(); + TestingServer zkServer2 = new TestingServer(zkServerPort2, true); + + int zkServerPort3 = NetUtils.getAvailablePort(); + TestingServer zkServer3 = new TestingServer(zkServerPort3, true); + + URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort3 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT×tamp=1547102428828"); + URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true"); + ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url); + //just for connected + newZookeeperClient.getContent("/dubbo/test"); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient); + + ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2); + //just for connected + newZookeeperClient2.getContent("/dubbo/test"); + Assertions.assertNotEquals(newZookeeperClient, newZookeeperClient2); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3); + Assertions.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2); + + zkServer2.stop(); + zkServer3.stop(); + } + + @Test + public void testSameHostWithDifferentUser() throws Exception { + int zkPort1 = NetUtils.getAvailablePort(); + int zkPort2 = NetUtils.getAvailablePort(); + try (TestingServer zkServer1 = new TestingServer(zkPort1, true)) { + try (TestingServer zkServer2 = new TestingServer(zkPort2, true)) { + URL url1 = URL.valueOf("zookeeper://us1:pw1@127.0.0.1:" + zkPort1 + "/path1"); + URL url2 = URL.valueOf("zookeeper://us2:pw2@127.0.0.1:" + zkPort1 + "/path2"); + + ZookeeperClient client1 = abstractZookeeperTransporter.connect(url1); + ZookeeperClient client2 = abstractZookeeperTransporter.connect(url2); + + assertThat(client1, not(client2)); + } + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml index d97f8486f06..3d217e9967d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml +++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml @@ -25,7 +25,7 @@ dubbo-remoting-zookeeper jar ${project.artifactId} - The zookeeper remoting module of dubbo project + The zookeeper curator remoting module of dubbo project false diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index 1971017af6f..d23f91e80af 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -23,7 +23,7 @@ import org.apache.dubbo.remoting.zookeeper.DataListener; import org.apache.dubbo.remoting.zookeeper.EventType; import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperClient; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperTransporter.java index 21a2e095422..a18b78545f9 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperTransporter.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperTransporter.java @@ -18,7 +18,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter; public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter { @Override diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/support/AbstractZookeeperTransporterTest.java similarity index 99% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporterTest.java rename to dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/support/AbstractZookeeperTransporterTest.java index 3662d8ffa55..53c7ebe5d13 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperTransporterTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/support/AbstractZookeeperTransporterTest.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.zookeeper.support; +package org.apache.dubbo.remoting.zookeeper.curator.support; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter; import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; import org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter; diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml index 1b0524ef9f6..830884c5fc9 100644 --- a/dubbo-remoting/pom.xml +++ b/dubbo-remoting/pom.xml @@ -38,8 +38,9 @@ dubbo-remoting-p2p dubbo-remoting-http dubbo-remoting-zookeeper + dubbo-remoting-zookeeper-curator5 dubbo-remoting-netty4 dubbo-remoting-etcd3 dubbo-remoting-redis - \ No newline at end of file +