Skip to content

Commit

Permalink
CURATOR-535: Fix client port conflict due to untrustworthy random por…
Browse files Browse the repository at this point in the history
…t allocation (#421)

This commit tries to solve port conflict for `TestingServer` if port is
unspecified(aka. `port <= 0`):
* Set `InstanceSpec.port` to 0 if port is unspecified.
* Save OS chosen bind port after started to maintain same port across
  restart.

Ideally, it should be possible to bootstrap `TestingCluster`(with unspecified ports)
too with help from `reconfig`. But there are difficulties since election port, quorum
port were not designed to be bound to `0` in ZooKeeper. `TestingServer` should be
enough for most cases.

Users should resort to other solutions(eg. container) if they got bored
by port conflict due to usages of `TestingCluster` and
`TestingServer.restart`.
  • Loading branch information
kezhuw committed Feb 21, 2023
1 parent 1a4149f commit 7e7c207
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 84 deletions.
Expand Up @@ -205,6 +205,10 @@ public int getQuorumPort()
return quorumPort;
}

/**
* @deprecated use {@link TestingServer#getConnectString()} or {@link TestingCluster#getConnectString()} instead
*/
@Deprecated
public String getConnectString()
{
return hostname + ":" + port;
Expand Down
Expand Up @@ -97,7 +97,13 @@ public void close()

public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
{
Properties properties = buildConfigProperties(instanceIndex);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
return buildConfig(instanceIndex, spec.getPort());
}

public QuorumPeerConfig buildConfig(int instanceIndex, int instancePort) throws Exception
{
Properties properties = buildConfigProperties(instanceIndex, instancePort);
QuorumPeerConfig config = new QuorumPeerConfig()
{
{
Expand All @@ -112,6 +118,12 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
}

public Properties buildConfigProperties(int instanceIndex) throws Exception
{
InstanceSpec spec = instanceSpecs.get(instanceIndex);
return buildConfigProperties(instanceIndex, spec.getPort());
}

public Properties buildConfigProperties(int instanceIndex, int instancePort) throws Exception
{
boolean isCluster = (instanceSpecs.size() > 1);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
Expand All @@ -128,7 +140,7 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception
properties.setProperty("initLimit", "10");
properties.setProperty("syncLimit", "5");
properties.setProperty("dataDir", spec.getDataDirectory().getCanonicalPath());
properties.setProperty("clientPort", Integer.toString(spec.getPort()));
properties.setProperty("clientPort", Integer.toString(instancePort));
String tickTime = Integer.toString((spec.getTickTime() >= 0) ? spec.getTickTime() : new Timing2().tickTime());
properties.setProperty("tickTime", tickTime);
properties.setProperty("minSessionTimeout", tickTime);
Expand All @@ -142,7 +154,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception
{
for ( InstanceSpec thisSpec : instanceSpecs )
{
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), thisSpec.getPort()));
int clientPort = thisSpec == spec ? instancePort : thisSpec.getPort();
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), clientPort));
}
}
Map<String,Object> customProperties = spec.getCustomProperties();
Expand All @@ -152,4 +165,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception

return properties;
}

public QuorumPeerConfigBuilder bindInstance(int instanceIndex, int instancePort) {
return new QuorumPeerConfigBuilder(this, instanceIndex, instancePort);
}
}
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.test;

import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

import java.util.Properties;

public class QuorumPeerConfigBuilder {
private final QuorumConfigBuilder configBuilder;
private final int instanceIndex;
private final int instancePort;

QuorumPeerConfigBuilder(QuorumConfigBuilder configBuilder, int instanceIndex, int instancePort) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
this.instancePort = instancePort;
}

public boolean isFromRandom() {
return configBuilder.isFromRandom();
}

public InstanceSpec getInstanceSpec() {
return configBuilder.getInstanceSpec(instanceIndex);
}

public QuorumPeerConfig buildConfig() throws Exception {
return configBuilder.buildConfig(instanceIndex, instancePort);
}

public Properties buildProperties() throws Exception {
return configBuilder.buildConfigProperties(instanceIndex, instancePort);
}
}
Expand Up @@ -22,7 +22,6 @@
import java.nio.channels.ServerSocketChannel;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,9 +31,6 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
private static final Logger log = LoggerFactory.getLogger(TestingQuorumPeerMain.class);
private volatile boolean isClosed = false;

private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

@Override
public void kill()
{
Expand Down Expand Up @@ -92,30 +88,20 @@ private void blockUntilStarted()
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
}

@Override
public void start() {
public void start(QuorumPeerConfigBuilder configBuilder) {
new Thread(() -> {
try {
runFromConfig(getConfig());
runFromConfig(configBuilder.buildConfig());
} catch (Exception e) {
log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(instanceIndex), e);
log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(), e);
}
}).start();

blockUntilStarted();
}

@Override
public int getClientPort() {
return quorumPeer == null ? -1 : quorumPeer.getClientPort();
}
}
Expand Up @@ -101,7 +101,7 @@ public TestingServer(int port, File tempDirectory) throws Exception
*/
public TestingServer(int port, File tempDirectory, boolean start) throws Exception
{
this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1), start);
this(new InstanceSpec(tempDirectory, Math.max(0, port), -1, -1, true, -1), start);
}

/**
Expand All @@ -123,13 +123,18 @@ public TestingServer(InstanceSpec spec, boolean start) throws Exception
}

/**
* Return the port being used
* Return the port being used or will be used.
*
* @return port
* @throws IllegalStateException if server is configured to bind to port 0 but not started
*/
public int getPort()
{
return spec.getPort();
int port = spec.getPort();
if (port > 0) {
return port;
}
return testingZooKeeperServer.getLocalPort();
}

/**
Expand Down Expand Up @@ -186,9 +191,9 @@ public void close() throws IOException
* Returns the connection string to use
*
* @return connection string
* @throws IllegalStateException if server is configured to bind to port 0 but not started
*/
public String getConnectString()
{
return spec.getConnectString();
public String getConnectString() {
return spec.getHostname() + ":" + getPort();
}
}
Expand Up @@ -51,8 +51,6 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
private volatile ServerCnxnFactory cnxnFactory;
private volatile TestZooKeeperServer zkServer;
private volatile ContainerManager containerManager;
private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

private static final Timing timing = new Timing();

Expand Down Expand Up @@ -98,13 +96,8 @@ public void kill()
}
}

@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
TestZooKeeperServer getZkServer() {
return zkServer;
}

private void runFromConfig(QuorumPeerConfig config) throws Exception
Expand Down Expand Up @@ -271,27 +264,26 @@ private void internalRunFromConfig(ServerConfig config) throws IOException
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public void start() {
public void start(QuorumPeerConfigBuilder configBuilder) {
new Thread(() -> {
try
{
runFromConfig(getConfig());
runFromConfig(configBuilder.buildConfig());
}
catch ( Exception e )
{
log.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), configBuilder.getInstanceSpec(instanceIndex)), e);
log.error(String.format("From testing server (random state: %s) for instance: %s", configBuilder.isFromRandom(), configBuilder.getInstanceSpec()), e);
}
}, "zk-main-thread").start();

blockUntilStarted();
}

@Override
public int getClientPort() {
return cnxnFactory == null ? -1 : cnxnFactory.getLocalPort();
}

public static class TestZooKeeperServer extends ZooKeeperServer
{
private final FileTxnSnapLog txnLog;
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,11 +34,12 @@
public class TestingZooKeeperServer implements Closeable
{
private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperServer.class);
private static final boolean hasZooKeeperServerEmbedded;
static boolean hasZooKeeperServerEmbedded;

private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final QuorumConfigBuilder configBuilder;
private final int thisInstanceIndex;
private int thisInstancePort;
private volatile ZooKeeperMainFace main;

static {
Expand Down Expand Up @@ -71,6 +74,7 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanc

this.configBuilder = configBuilder;
this.thisInstanceIndex = thisInstanceIndex;
this.thisInstancePort = configBuilder.getInstanceSpec(thisInstanceIndex).getPort();
main = createServerMain();
}

Expand Down Expand Up @@ -163,7 +167,14 @@ public void start() throws Exception
return;
}

main.configure(configBuilder, thisInstanceIndex);
main.start();
main.start(configBuilder.bindInstance(thisInstanceIndex, thisInstancePort));
thisInstancePort = main.getClientPort();
}

public int getLocalPort() {
if (thisInstancePort == 0) {
throw new IllegalStateException("server is configured to bind to port 0 but not started");
}
return thisInstancePort;
}
}
Expand Up @@ -19,15 +19,12 @@
package org.apache.curator.test;

import java.io.Closeable;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

public interface ZooKeeperMainFace extends Closeable
interface ZooKeeperMainFace extends Closeable
{
void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception;

void start();
void start(QuorumPeerConfigBuilder configBuilder);

void kill();

QuorumPeerConfig getConfig() throws Exception;
int getClientPort() throws Exception;
}

0 comments on commit 7e7c207

Please sign in to comment.