diff --git a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java index b1d80e6d6e..375d8801d8 100644 --- a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java +++ b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java @@ -205,6 +205,10 @@ public int getQuorumPort() return quorumPort; } + /** + * @deprecated use {@link TestingServer#getConnectString()} or {@link TestingCluster#getConnectString()} instead + */ + @Deprecated public String getConnectString() { return hostname + ":" + port; diff --git a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java index 42f7b27bd5..35c3523e35 100644 --- a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java +++ b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java @@ -97,7 +97,13 @@ public void close() public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception { - Properties properties = buildConfigProperties(instanceIndex); + InstanceSpec spec = instanceSpecs.get(instanceIndex); + return buildConfig(instanceIndex, spec.getPort()); + } + + public QuorumPeerConfig buildConfig(int instanceIndex, int instancePort) throws Exception + { + Properties properties = buildConfigProperties(instanceIndex, instancePort); QuorumPeerConfig config = new QuorumPeerConfig() { { @@ -112,6 +118,12 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception } public Properties buildConfigProperties(int instanceIndex) throws Exception + { + InstanceSpec spec = instanceSpecs.get(instanceIndex); + return buildConfigProperties(instanceIndex, spec.getPort()); + } + + public Properties buildConfigProperties(int instanceIndex, int instancePort) throws Exception { boolean isCluster = (instanceSpecs.size() > 1); InstanceSpec spec = instanceSpecs.get(instanceIndex); @@ -128,7 +140,7 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception properties.setProperty("initLimit", "10"); properties.setProperty("syncLimit", "5"); properties.setProperty("dataDir", spec.getDataDirectory().getCanonicalPath()); - properties.setProperty("clientPort", Integer.toString(spec.getPort())); + properties.setProperty("clientPort", Integer.toString(instancePort)); String tickTime = Integer.toString((spec.getTickTime() >= 0) ? spec.getTickTime() : new Timing2().tickTime()); properties.setProperty("tickTime", tickTime); properties.setProperty("minSessionTimeout", tickTime); @@ -142,7 +154,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception { for ( InstanceSpec thisSpec : instanceSpecs ) { - properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), thisSpec.getPort())); + int clientPort = thisSpec == spec ? instancePort : thisSpec.getPort(); + properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), clientPort)); } } Map customProperties = spec.getCustomProperties(); @@ -152,4 +165,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception return properties; } + + public QuorumPeerConfigBuilder bindInstance(int instanceIndex, int instancePort) { + return new QuorumPeerConfigBuilder(this, instanceIndex, instancePort); + } } diff --git a/curator-test/src/main/java/org/apache/curator/test/QuorumPeerConfigBuilder.java b/curator-test/src/main/java/org/apache/curator/test/QuorumPeerConfigBuilder.java new file mode 100644 index 0000000000..aef3b83ab2 --- /dev/null +++ b/curator-test/src/main/java/org/apache/curator/test/QuorumPeerConfigBuilder.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.test; + +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; + +import java.util.Properties; + +public class QuorumPeerConfigBuilder { + private final QuorumConfigBuilder configBuilder; + private final int instanceIndex; + private final int instancePort; + + QuorumPeerConfigBuilder(QuorumConfigBuilder configBuilder, int instanceIndex, int instancePort) { + this.configBuilder = configBuilder; + this.instanceIndex = instanceIndex; + this.instancePort = instancePort; + } + + public boolean isFromRandom() { + return configBuilder.isFromRandom(); + } + + public InstanceSpec getInstanceSpec() { + return configBuilder.getInstanceSpec(instanceIndex); + } + + public QuorumPeerConfig buildConfig() throws Exception { + return configBuilder.buildConfig(instanceIndex, instancePort); + } + + public Properties buildProperties() throws Exception { + return configBuilder.buildConfigProperties(instanceIndex, instancePort); + } +} diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java index a54e8cb335..d237f35627 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java @@ -22,7 +22,6 @@ import java.nio.channels.ServerSocketChannel; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,9 +31,6 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace private static final Logger log = LoggerFactory.getLogger(TestingQuorumPeerMain.class); private volatile boolean isClosed = false; - private volatile QuorumConfigBuilder configBuilder; - private volatile int instanceIndex; - @Override public void kill() { @@ -92,30 +88,20 @@ private void blockUntilStarted() } @Override - public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) { - this.configBuilder = configBuilder; - this.instanceIndex = instanceIndex; - } - - @Override - public QuorumPeerConfig getConfig() throws Exception { - if (configBuilder != null) { - return configBuilder.buildConfig(instanceIndex); - } - - return null; - } - - @Override - public void start() { + public void start(QuorumPeerConfigBuilder configBuilder) { new Thread(() -> { try { - runFromConfig(getConfig()); + runFromConfig(configBuilder.buildConfig()); } catch (Exception e) { - log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(instanceIndex), e); + log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(), e); } }).start(); blockUntilStarted(); } + + @Override + public int getClientPort() { + return quorumPeer == null ? -1 : quorumPeer.getClientPort(); + } } diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingServer.java b/curator-test/src/main/java/org/apache/curator/test/TestingServer.java index 9d24243575..4a50afcf4e 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingServer.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingServer.java @@ -101,7 +101,7 @@ public TestingServer(int port, File tempDirectory) throws Exception */ public TestingServer(int port, File tempDirectory, boolean start) throws Exception { - this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1), start); + this(new InstanceSpec(tempDirectory, Math.max(0, port), -1, -1, true, -1), start); } /** @@ -123,13 +123,18 @@ public TestingServer(InstanceSpec spec, boolean start) throws Exception } /** - * Return the port being used + * Return the port being used or will be used. * * @return port + * @throws IllegalStateException if server is configured to bind to port 0 but not started */ public int getPort() { - return spec.getPort(); + int port = spec.getPort(); + if (port > 0) { + return port; + } + return testingZooKeeperServer.getLocalPort(); } /** @@ -186,9 +191,9 @@ public void close() throws IOException * Returns the connection string to use * * @return connection string + * @throws IllegalStateException if server is configured to bind to port 0 but not started */ - public String getConnectString() - { - return spec.getConnectString(); + public String getConnectString() { + return spec.getHostname() + ":" + getPort(); } } \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java index 646cdc4f04..aa702e0c7b 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java @@ -51,8 +51,6 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace private volatile ServerCnxnFactory cnxnFactory; private volatile TestZooKeeperServer zkServer; private volatile ContainerManager containerManager; - private volatile QuorumConfigBuilder configBuilder; - private volatile int instanceIndex; private static final Timing timing = new Timing(); @@ -98,13 +96,8 @@ public void kill() } } - @Override - public QuorumPeerConfig getConfig() throws Exception { - if (configBuilder != null) { - return configBuilder.buildConfig(instanceIndex); - } - - return null; + TestZooKeeperServer getZkServer() { + return zkServer; } private void runFromConfig(QuorumPeerConfig config) throws Exception @@ -271,27 +264,26 @@ private void internalRunFromConfig(ServerConfig config) throws IOException } @Override - public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) { - this.configBuilder = configBuilder; - this.instanceIndex = instanceIndex; - } - - @Override - public void start() { + public void start(QuorumPeerConfigBuilder configBuilder) { new Thread(() -> { try { - runFromConfig(getConfig()); + runFromConfig(configBuilder.buildConfig()); } catch ( Exception e ) { - log.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), configBuilder.getInstanceSpec(instanceIndex)), e); + log.error(String.format("From testing server (random state: %s) for instance: %s", configBuilder.isFromRandom(), configBuilder.getInstanceSpec()), e); } }, "zk-main-thread").start(); blockUntilStarted(); } + @Override + public int getClientPort() { + return cnxnFactory == null ? -1 : cnxnFactory.getLocalPort(); + } + public static class TestZooKeeperServer extends ZooKeeperServer { private final FileTxnSnapLog txnLog; diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java index 42d94fd4a0..f12c117c52 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +34,12 @@ public class TestingZooKeeperServer implements Closeable { private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperServer.class); - private static final boolean hasZooKeeperServerEmbedded; + static boolean hasZooKeeperServerEmbedded; private final AtomicReference state = new AtomicReference<>(State.LATENT); private final QuorumConfigBuilder configBuilder; private final int thisInstanceIndex; + private int thisInstancePort; private volatile ZooKeeperMainFace main; static { @@ -71,6 +74,7 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanc this.configBuilder = configBuilder; this.thisInstanceIndex = thisInstanceIndex; + this.thisInstancePort = configBuilder.getInstanceSpec(thisInstanceIndex).getPort(); main = createServerMain(); } @@ -163,7 +167,14 @@ public void start() throws Exception return; } - main.configure(configBuilder, thisInstanceIndex); - main.start(); + main.start(configBuilder.bindInstance(thisInstanceIndex, thisInstancePort)); + thisInstancePort = main.getClientPort(); + } + + public int getLocalPort() { + if (thisInstancePort == 0) { + throw new IllegalStateException("server is configured to bind to port 0 but not started"); + } + return thisInstancePort; } } \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java index 5b8521fe68..6f8120794e 100644 --- a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java +++ b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java @@ -19,15 +19,12 @@ package org.apache.curator.test; import java.io.Closeable; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -public interface ZooKeeperMainFace extends Closeable +interface ZooKeeperMainFace extends Closeable { - void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception; - - void start(); + void start(QuorumPeerConfigBuilder configBuilder); void kill(); - QuorumPeerConfig getConfig() throws Exception; + int getClientPort() throws Exception; } diff --git a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java index 0e8aeb0763..ba7591bc83 100644 --- a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java +++ b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java @@ -19,12 +19,20 @@ package org.apache.curator.test; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Properties; + +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded; +import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.apache.zookeeper.server.util.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,47 +41,97 @@ public class ZooKeeperServerEmbeddedAdapter implements ZooKeeperMainFace { private static final Duration DEFAULT_STARTUP_TIMEOUT = Duration.ofMinutes(1); private volatile ZooKeeperServerEmbedded zooKeeperEmbedded; - private volatile QuorumConfigBuilder configBuilder; - private volatile int instanceIndex; @Override - public void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception { - this.configBuilder = config; - this.instanceIndex = instanceIndex; + public void start(QuorumPeerConfigBuilder configBuilder) { + try { + final Properties properties = configBuilder.buildProperties(); + properties.put("admin.enableServer", "false"); - final Properties properties = config.buildConfigProperties(instanceIndex); - properties.put("admin.enableServer", "false"); + final Path dataDir = Paths.get(properties.getProperty("dataDir")); + zooKeeperEmbedded = ZooKeeperServerEmbedded.builder() + .configuration(properties) + .baseDir(dataDir.getParent()) + .build(); + log.info("Configure ZooKeeperServerEmbeddedAdapter with properties: {}", properties); - final Path dataDir = Paths.get(properties.getProperty("dataDir")); - zooKeeperEmbedded = ZooKeeperServerEmbedded.builder() - .configuration(properties) - .baseDir(dataDir.getParent()) - .build(); - log.info("Configure ZooKeeperServerEmbeddedAdapter with properties: {}", properties); + // Before ZOOKEEPER-4303, there are issues when setting "clientPort" to 0: + // * It does not set "clientPortAddress" which causes ZooKeeper started with no + // server cnxn factory to serve client requests. + // * It uses "clientPortAddress" to construct connection string but not bound port. + // + // So here, we hijack start process to circumvent these if there is no fix applied. + // * Setup "clientPortAddress" if it is null. + // * Setup "clientPortAddress" with bound port after started if above step applied. + if (hijackClientPort(0)) { + zooKeeperEmbedded.start(DEFAULT_STARTUP_TIMEOUT.toMillis()); + int port = getServerCnxnFactory().getLocalPort(); + hijackClientPort(port); + } else { + zooKeeperEmbedded.start(DEFAULT_STARTUP_TIMEOUT.toMillis()); + } + } catch (Exception e) { + throw new FailedServerStartException(e); + } } @Override - public QuorumPeerConfig getConfig() throws Exception { - if (configBuilder != null) { - return configBuilder.buildConfig(instanceIndex); + public int getClientPort() throws Exception { + String address = zooKeeperEmbedded.getConnectionString(); + try { + String[] parts = ConfigUtils.getHostAndPort(address); + return Integer.parseInt(parts[1], 10); + } catch (Exception ex) { + throw new IllegalStateException("invalid connection string: " + address); } - - return null; } - @Override - public void start() { - if (zooKeeperEmbedded == null) { - throw new FailedServerStartException(new NullPointerException("zooKeeperEmbedded")); + private boolean hijackClientPort(int port) { + try { + Class clazz = Class.forName("org.apache.zookeeper.server.embedded.ZooKeeperServerEmbeddedImpl"); + Field configField = clazz.getDeclaredField("config"); + configField.setAccessible(true); + QuorumPeerConfig peerConfig = (QuorumPeerConfig) configField.get(zooKeeperEmbedded); + if (peerConfig.getClientPortAddress() == null || port != 0) { + Field addressField = QuorumPeerConfig.class.getDeclaredField("clientPortAddress"); + addressField.setAccessible(true); + addressField.set(peerConfig, new InetSocketAddress(port)); + return true; + } + } catch (Exception ignored) { + // swallow hijack failure to accommodate possible upstream changes } + return false; + } + public ServerCnxnFactory getServerCnxnFactory() { try { - zooKeeperEmbedded.start(DEFAULT_STARTUP_TIMEOUT.toMillis()); - } catch (Exception e) { - throw new FailedServerStartException(e); + Class clazz = Class.forName("org.apache.zookeeper.server.embedded.ZooKeeperServerEmbeddedImpl"); + Field clusterField = clazz.getDeclaredField("maincluster"); + clusterField.setAccessible(true); + QuorumPeerMain quorumPeerMain = (QuorumPeerMain) clusterField.get(zooKeeperEmbedded); + if (quorumPeerMain != null) { + Field quorumPeerField = QuorumPeerMain.class.getDeclaredField("quorumPeer"); + quorumPeerField.setAccessible(true); + QuorumPeer quorumPeer = (QuorumPeer) quorumPeerField.get(quorumPeerMain); + return getServerCnxnFactory(QuorumPeer.class, quorumPeer, "cnxnFactory"); + } + Field serverField = clazz.getDeclaredField("mainsingle"); + serverField.setAccessible(true); + ZooKeeperServerMain server = (ZooKeeperServerMain) serverField.get(zooKeeperEmbedded); + return getServerCnxnFactory(ZooKeeperServerMain.class, server, "cnxnFactory"); + } catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) { + throw new IllegalStateException("zk server cnxn factory not found", ex); } } + static ServerCnxnFactory getServerCnxnFactory(Class clazz, Object obj, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + Field cnxnFactoryField = clazz.getDeclaredField(fieldName); + cnxnFactoryField.setAccessible(true); + return (ServerCnxnFactory) cnxnFactoryField.get(obj); + } + @Override public void kill() { close(); diff --git a/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java b/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java index f16b2ac748..09ac447074 100644 --- a/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java +++ b/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java @@ -35,6 +35,8 @@ public class TestTestingServer { @Test public void setCustomTickTimeTest() throws Exception { + TestingZooKeeperServer.hasZooKeeperServerEmbedded = false; + final int defaultZkTickTime = ZooKeeperServer.DEFAULT_TICK_TIME; final int customTickMs; if (defaultZkTickTime > 0) { @@ -45,7 +47,8 @@ public void setCustomTickTimeTest() throws Exception { final InstanceSpec spec = new InstanceSpec(zkTmpDir, -1, -1, -1, true, -1, customTickMs, -1); final int zkTickTime; try (TestingServer testingServer = new TestingServer(spec, true)) { - zkTickTime = testingServer.getTestingZooKeeperServer().getMain().getConfig().getTickTime(); + TestingZooKeeperMain main = (TestingZooKeeperMain) testingServer.getTestingZooKeeperServer().getMain(); + zkTickTime = main.getZkServer().getTickTime(); } assertEquals(customTickMs, zkTickTime); }