Skip to content

Commit

Permalink
CURATOR-535: Fix client port conflict due to untrustworthy random por…
Browse files Browse the repository at this point in the history
…t allocation

This commit tries to solve port conflict for `TestingServer` if port is
unspecified(aka. `port <= 0`):
* Set `InstanceSpec.port` to 0 if port is unspecified.
* Save OS chosen bind port after started to maintain same port across
  restart.

Ideally, it should be possible to bootstrap `TestingCluster`(with unspecified ports)
too with help from `reconfig`. But there are difficulties since election port, quorum
port were not designed to be bound to `0` in ZooKeeper. `TestingServer` should be
enough for most cases.

Users should resort to other solutions(eg. container) if they got bored
by port conflict due to usages of `TestingCluster` and
`TestingServer.restart`.
  • Loading branch information
kezhuw committed Jun 22, 2022
1 parent 15a9f03 commit c82a8de
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 10 deletions.
Expand Up @@ -206,6 +206,10 @@ public int getQuorumPort()
return quorumPort;
}

/**
* @deprecated use {@link TestingServer#getConnectString()} or {@link TestingCluster#getConnectString()} instead
*/
@Deprecated
public String getConnectString()
{
return hostname + ":" + port;
Expand Down
Expand Up @@ -95,7 +95,12 @@ public void close()
}
}

public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception {
InstanceSpec spec = instanceSpecs.get(instanceIndex);
return buildConfig(instanceIndex, spec.getPort());
}

QuorumPeerConfig buildConfig(int instanceIndex, int instancePort) throws Exception
{
boolean isCluster = (instanceSpecs.size() > 1);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
Expand All @@ -109,7 +114,7 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
properties.setProperty("initLimit", "10");
properties.setProperty("syncLimit", "5");
properties.setProperty("dataDir", spec.getDataDirectory().getCanonicalPath());
properties.setProperty("clientPort", Integer.toString(spec.getPort()));
properties.setProperty("clientPort", Integer.toString(instancePort));
String tickTime = Integer.toString((spec.getTickTime() >= 0) ? spec.getTickTime() : new Timing2().tickTime());
properties.setProperty("tickTime", tickTime);
properties.setProperty("minSessionTimeout", tickTime);
Expand All @@ -123,7 +128,8 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
{
for ( InstanceSpec thisSpec : instanceSpecs )
{
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), thisSpec.getPort()));
int clientPort = thisSpec == spec ? instancePort : thisSpec.getPort();
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), clientPort));
}
}
Map<String,Object> customProperties = spec.getCustomProperties();
Expand Down
Expand Up @@ -97,7 +97,7 @@ public TestingServer(int port, File tempDirectory) throws Exception
*/
public TestingServer(int port, File tempDirectory, boolean start) throws Exception
{
this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1), start);
this(new InstanceSpec(tempDirectory, Math.max(0, port), -1, -1, true, -1), start);
}

/**
Expand All @@ -119,13 +119,18 @@ public TestingServer(InstanceSpec spec, boolean start) throws Exception
}

/**
* Return the port being used
* Return the port being used or will be used.
*
* @apiNote the port will be 0 if server is not started and {@link InstanceSpec#getPort()} is 0
* @return port
*/
public int getPort()
{
return spec.getPort();
int port = spec.getPort();
if (port > 0) {
return port;
}
return testingZooKeeperServer.getLocalPort();
}

/**
Expand Down Expand Up @@ -181,10 +186,10 @@ public void close() throws IOException
/**
* Returns the connection string to use
*
* @apiNote the connection string will have port 0 if server is not started and {@link InstanceSpec#getPort()} is 0
* @return connection string
*/
public String getConnectString()
{
return spec.getConnectString();
public String getConnectString() {
return spec.getHostname() + ":" + getPort();
}
}
Expand Up @@ -74,6 +74,10 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
MAX_WAIT_MS = Math.max((int)elapsed * 2, 1000);
}

ServerCnxnFactory getCnxnFactory() {
return cnxnFactory;
}

@Override
public void kill()
{
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.apache.curator.test;

import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
Expand All @@ -39,6 +40,7 @@ public class TestingZooKeeperServer implements Closeable
private final int thisInstanceIndex;
private volatile ZooKeeperMainFace main;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private int thisInstancePort;

private enum State
{
Expand All @@ -56,6 +58,7 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanc

this.configBuilder = configBuilder;
this.thisInstanceIndex = thisInstanceIndex;
this.thisInstancePort = configBuilder.getInstanceSpec(thisInstanceIndex).getPort();
main = isCluster() ? new TestingQuorumPeerMain() : new TestingZooKeeperMain();
}

Expand Down Expand Up @@ -154,7 +157,7 @@ public void run()
{
try
{
QuorumPeerConfig config = configBuilder.buildConfig(thisInstanceIndex);
QuorumPeerConfig config = configBuilder.buildConfig(thisInstanceIndex, thisInstancePort);
main.runFromConfig(config);
}
catch ( Exception e )
Expand All @@ -165,5 +168,16 @@ public void run()
}).start();

main.blockUntilStarted();

// Save bind port across restart.
if (thisInstancePort == 0 && !isCluster()) {
TestingZooKeeperMain serverMain = (TestingZooKeeperMain) main;
ServerCnxnFactory cnxnFactory = serverMain.getCnxnFactory();
thisInstancePort = cnxnFactory.getLocalPort();
}
}

public int getLocalPort() {
return thisInstancePort;
}
}

0 comments on commit c82a8de

Please sign in to comment.