Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-535: Fix client port conflict due to untrustworthy random port allocation #421

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -205,6 +205,10 @@ public int getQuorumPort()
return quorumPort;
}

/**
* @deprecated use {@link TestingServer#getConnectString()} or {@link TestingCluster#getConnectString()} instead
*/
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already remove public QuorumPeerConfig getConfig() throws Exception { which breaks compatibility, what if we remove this deprecated mark and change the visibility to default (pacakge)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and why do we remove ZooKeeperMainFace#getConfig?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this pr mixed two things: CURATOR-535 and bring back TestTestingServer#setCustomTickTimeTest for #383.

ZooKeeperMainFace#getConfig was introduced in #434, and its sole external usage in TestTestingServer#setCustomTickTimeTest undermines #383.

Comparing to public InstanceSpec, I think ZooKeeperMainFace is more like an internal concept. May be we can restrict visibility of ZooKeeperMainFace to package ? This should solve all above concerns if make sense to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kezhuw That sounds reasonable. And we can add getTickTime or something so that we don't have to change hasZooKeeperServerEmbedded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed public from ZooKeeperMainFace. @tisonkun

getTickTime needs reflection as ZooKeeperServerEmbedded exposes no such info(ZOOKEEPER-4670). In previous version(8eda7d0), I exposed a get method in ZooKeeperMainFace to retrieve ServerCnxnFactory where both getTickTime and getClientPort could be easily constructed . But that approach depends heavily on reflection as QuorumPeerMain and ZooKeeperServerEmbedded exposes no such info.

public String getConnectString()
{
return hostname + ":" + port;
Expand Down
Expand Up @@ -97,7 +97,13 @@ public void close()

public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
{
Properties properties = buildConfigProperties(instanceIndex);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
return buildConfig(instanceIndex, spec.getPort());
}

public QuorumPeerConfig buildConfig(int instanceIndex, int instancePort) throws Exception
{
Properties properties = buildConfigProperties(instanceIndex, instancePort);
QuorumPeerConfig config = new QuorumPeerConfig()
{
{
Expand All @@ -112,6 +118,12 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
}

public Properties buildConfigProperties(int instanceIndex) throws Exception
{
InstanceSpec spec = instanceSpecs.get(instanceIndex);
return buildConfigProperties(instanceIndex, spec.getPort());
}

public Properties buildConfigProperties(int instanceIndex, int instancePort) throws Exception
{
boolean isCluster = (instanceSpecs.size() > 1);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
Expand All @@ -128,7 +140,7 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception
properties.setProperty("initLimit", "10");
properties.setProperty("syncLimit", "5");
properties.setProperty("dataDir", spec.getDataDirectory().getCanonicalPath());
properties.setProperty("clientPort", Integer.toString(spec.getPort()));
properties.setProperty("clientPort", Integer.toString(instancePort));
String tickTime = Integer.toString((spec.getTickTime() >= 0) ? spec.getTickTime() : new Timing2().tickTime());
properties.setProperty("tickTime", tickTime);
properties.setProperty("minSessionTimeout", tickTime);
Expand All @@ -142,7 +154,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception
{
for ( InstanceSpec thisSpec : instanceSpecs )
{
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), thisSpec.getPort()));
int clientPort = thisSpec == spec ? instancePort : thisSpec.getPort();
properties.setProperty("server." + thisSpec.getServerId(), String.format("%s:%d:%d;%s:%d", thisSpec.getHostname(), thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getHostname(), clientPort));
}
}
Map<String,Object> customProperties = spec.getCustomProperties();
Expand All @@ -152,4 +165,8 @@ public Properties buildConfigProperties(int instanceIndex) throws Exception

return properties;
}

public QuorumPeerConfigBuilder bindInstance(int instanceIndex, int instancePort) {
return new QuorumPeerConfigBuilder(this, instanceIndex, instancePort);
}
}
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.test;

import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

import java.util.Properties;

public class QuorumPeerConfigBuilder {
private final QuorumConfigBuilder configBuilder;
private final int instanceIndex;
private final int instancePort;

QuorumPeerConfigBuilder(QuorumConfigBuilder configBuilder, int instanceIndex, int instancePort) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
this.instancePort = instancePort;
}

public boolean isFromRandom() {
return configBuilder.isFromRandom();
}

public InstanceSpec getInstanceSpec() {
return configBuilder.getInstanceSpec(instanceIndex);
}

public QuorumPeerConfig buildConfig() throws Exception {
return configBuilder.buildConfig(instanceIndex, instancePort);
}

public Properties buildProperties() throws Exception {
return configBuilder.buildConfigProperties(instanceIndex, instancePort);
}
}
Expand Up @@ -22,7 +22,6 @@
import java.nio.channels.ServerSocketChannel;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,9 +31,6 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
private static final Logger log = LoggerFactory.getLogger(TestingQuorumPeerMain.class);
private volatile boolean isClosed = false;

private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

@Override
public void kill()
{
Expand Down Expand Up @@ -92,30 +88,20 @@ private void blockUntilStarted()
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
}

@Override
public void start() {
public void start(QuorumPeerConfigBuilder configBuilder) {
new Thread(() -> {
try {
runFromConfig(getConfig());
runFromConfig(configBuilder.buildConfig());
} catch (Exception e) {
log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(instanceIndex), e);
log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(), e);
}
}).start();

blockUntilStarted();
}

@Override
public int getClientPort() {
return quorumPeer == null ? -1 : quorumPeer.getClientPort();
}
}
Expand Up @@ -101,7 +101,7 @@ public TestingServer(int port, File tempDirectory) throws Exception
*/
public TestingServer(int port, File tempDirectory, boolean start) throws Exception
{
this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1), start);
this(new InstanceSpec(tempDirectory, Math.max(0, port), -1, -1, true, -1), start);
}

/**
Expand All @@ -123,13 +123,18 @@ public TestingServer(InstanceSpec spec, boolean start) throws Exception
}

/**
* Return the port being used
* Return the port being used or will be used.
*
* @return port
* @throws IllegalStateException if server is configured to bind to port 0 but not started
*/
public int getPort()
{
return spec.getPort();
int port = spec.getPort();
if (port > 0) {
return port;
}
return testingZooKeeperServer.getLocalPort();
}

/**
Expand Down Expand Up @@ -186,9 +191,9 @@ public void close() throws IOException
* Returns the connection string to use
*
* @return connection string
* @throws IllegalStateException if server is configured to bind to port 0 but not started
*/
public String getConnectString()
{
return spec.getConnectString();
public String getConnectString() {
return spec.getHostname() + ":" + getPort();
}
}
Expand Up @@ -51,8 +51,6 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
private volatile ServerCnxnFactory cnxnFactory;
private volatile TestZooKeeperServer zkServer;
private volatile ContainerManager containerManager;
private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

private static final Timing timing = new Timing();

Expand Down Expand Up @@ -98,13 +96,8 @@ public void kill()
}
}

@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
TestZooKeeperServer getZkServer() {
return zkServer;
}

private void runFromConfig(QuorumPeerConfig config) throws Exception
Expand Down Expand Up @@ -271,27 +264,26 @@ private void internalRunFromConfig(ServerConfig config) throws IOException
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public void start() {
public void start(QuorumPeerConfigBuilder configBuilder) {
new Thread(() -> {
try
{
runFromConfig(getConfig());
runFromConfig(configBuilder.buildConfig());
}
catch ( Exception e )
{
log.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), configBuilder.getInstanceSpec(instanceIndex)), e);
log.error(String.format("From testing server (random state: %s) for instance: %s", configBuilder.isFromRandom(), configBuilder.getInstanceSpec()), e);
}
}, "zk-main-thread").start();

blockUntilStarted();
}

@Override
public int getClientPort() {
return cnxnFactory == null ? -1 : cnxnFactory.getLocalPort();
}

public static class TestZooKeeperServer extends ZooKeeperServer
{
private final FileTxnSnapLog txnLog;
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,11 +34,12 @@
public class TestingZooKeeperServer implements Closeable
{
private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperServer.class);
private static final boolean hasZooKeeperServerEmbedded;
static boolean hasZooKeeperServerEmbedded;

private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final QuorumConfigBuilder configBuilder;
private final int thisInstanceIndex;
private int thisInstancePort;
private volatile ZooKeeperMainFace main;

static {
Expand Down Expand Up @@ -71,6 +74,7 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanc

this.configBuilder = configBuilder;
this.thisInstanceIndex = thisInstanceIndex;
this.thisInstancePort = configBuilder.getInstanceSpec(thisInstanceIndex).getPort();
main = createServerMain();
}

Expand Down Expand Up @@ -163,7 +167,14 @@ public void start() throws Exception
return;
}

main.configure(configBuilder, thisInstanceIndex);
main.start();
main.start(configBuilder.bindInstance(thisInstanceIndex, thisInstancePort));
thisInstancePort = main.getClientPort();
}

public int getLocalPort() {
if (thisInstancePort == 0) {
throw new IllegalStateException("server is configured to bind to port 0 but not started");
}
return thisInstancePort;
}
}
Expand Up @@ -19,15 +19,12 @@
package org.apache.curator.test;

import java.io.Closeable;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

public interface ZooKeeperMainFace extends Closeable
interface ZooKeeperMainFace extends Closeable
{
void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception;

void start();
void start(QuorumPeerConfigBuilder configBuilder);

void kill();

QuorumPeerConfig getConfig() throws Exception;
int getClientPort() throws Exception;
}