Skip to content

Commit

Permalink
Allow non-hardcoded ports for multinode cluster
Browse files Browse the repository at this point in the history
This required the join and init args to be computed on the fly, since we
can no longer infer them until after a node has started.
  • Loading branch information
rafiss committed Mar 15, 2023
1 parent d51b7cd commit 0361cb3
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
32 changes: 10 additions & 22 deletions testserver/testserver.go
Expand Up @@ -419,19 +419,14 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
serverArgs.cockroachBinary = customBinaryEnv
}

// For backwards compatibility, in the 3 node case where no args are
// specified, default to ports 26257, 26258, 26259.
if serverArgs.numNodes == 3 && len(serverArgs.listenAddrPorts) == 0 {
serverArgs.listenAddrPorts = []int{26257, 26258, 26259}
} else if serverArgs.numNodes != 1 && len(serverArgs.listenAddrPorts) != serverArgs.numNodes {
if len(serverArgs.listenAddrPorts) == 0 {
serverArgs.listenAddrPorts = make([]int, serverArgs.numNodes)
}
if serverArgs.numNodes != 1 && len(serverArgs.listenAddrPorts) != serverArgs.numNodes {
panic(fmt.Sprintf("need to specify a port for each node using AddListenAddrPortOpt, got %d nodes, need %d ports",
serverArgs.numNodes, len(serverArgs.listenAddrPorts)))
}

if len(serverArgs.listenAddrPorts) == 0 {
serverArgs.listenAddrPorts = []int{0}
}

var err error
if serverArgs.cockroachBinary != "" {
log.Printf("Using custom cockroach binary: %s", serverArgs.cockroachBinary)
Expand Down Expand Up @@ -522,13 +517,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
}

nodes := make([]nodeInfo, serverArgs.numNodes)
var initArgs []string
joinAddrs := make([]string, 3)
hostPort := serverArgs.listenAddrPorts[0]
for i, port := range serverArgs.listenAddrPorts {
joinAddrs[i] = fmt.Sprintf("localhost:%d", port)
}

if len(serverArgs.httpPorts) == 0 {
serverArgs.httpPorts = make([]int, serverArgs.numNodes)
}
Expand All @@ -551,7 +539,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
nodes[i].listeningURLFile = filepath.Join(nodeBaseDir, "listen-url")
nodes[i].state = stateNew
if serverArgs.numNodes > 1 {
joinArg := fmt.Sprintf("--join=%s", strings.Join(joinAddrs, ","))
nodes[i].startCmdArgs = []string{
serverArgs.cockroachBinary,
startCmd,
Expand All @@ -568,7 +555,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
serverArgs.httpPorts[i],
),
"--listening-url-file=" + nodes[i].listeningURLFile,
joinArg,
"--external-io-dir=" + serverArgs.externalIODir,
}
} else {
Expand All @@ -589,11 +575,10 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {

// We only need initArgs if we're creating a testserver
// with multiple nodes.
initArgs = []string{
initArgs := []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
fmt.Sprintf("--host=localhost:%d", hostPort),
}

states := make([]int, serverArgs.numNodes)
Expand Down Expand Up @@ -867,7 +852,10 @@ func (ts *testServerImpl) Stop() {
}

func (ts *testServerImpl) CockroachInit() error {
ts.initCmd = exec.Command(ts.initCmdArgs[0], ts.initCmdArgs[1:]...)
// The port must be computed here, since it may not be known until after
// a node is started (if the listen port is 0).
args := append(ts.initCmdArgs, fmt.Sprintf("--host=localhost:%s", ts.PGURL().Port()))
ts.initCmd = exec.Command(args[0], args[1:]...)
ts.initCmd.Env = []string{
"COCKROACH_MAX_OFFSET=1ns",
"COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true",
Expand All @@ -880,7 +868,7 @@ func (ts *testServerImpl) CockroachInit() error {

err := ts.initCmd.Start()
if ts.initCmd.Process != nil {
log.Printf("process %d started: %s", ts.initCmd.Process.Pid, strings.Join(ts.initCmdArgs, " "))
log.Printf("process %d started: %s", ts.initCmd.Process.Pid, strings.Join(args, " "))
}
if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions testserver/testserver_test.go
Expand Up @@ -630,9 +630,6 @@ func TestUpgradeNode(t *testing.T) {
testserver.CockroachBinaryPathOpt(absPathOldBinary),
testserver.UpgradeCockroachBinaryPathOpt(absPathNewBinary),
testserver.StoreOnDiskOpt(),
testserver.AddListenAddrPortOpt(26257),
testserver.AddListenAddrPortOpt(26258),
testserver.AddListenAddrPortOpt(26259),
)
require.NoError(t, err)
defer ts.Stop()
Expand Down
37 changes: 35 additions & 2 deletions testserver/testservernode.go
Expand Up @@ -37,6 +37,9 @@ func (ts *testServerImpl) StopNode(nodeNum int) error {
return err
}
}
// Reset the pgURL, since it could change if the node is started later;
// specifically, if the listen port is 0 then the port will change.
ts.pgURL[nodeNum] = pgURLChan{}

return nil
}
Expand All @@ -47,7 +50,37 @@ func (ts *testServerImpl) StartNode(i int) error {
return fmt.Errorf("node %d already running", i)
}
ts.mu.RUnlock()
ts.nodes[i].startCmd = exec.Command(ts.nodes[i].startCmdArgs[0], ts.nodes[i].startCmdArgs[1:]...)

// We need to compute the join addresses here. since if the listen port is
// 0, then the actual port will not be known until a node is started.
var joinAddrs []string
for otherNodeID := range ts.nodes {
if i == otherNodeID {
continue
}
if otherNodeID < len(ts.serverArgs.listenAddrPorts) {
joinAddrs = append(joinAddrs, fmt.Sprintf("localhost:%d", ts.serverArgs.listenAddrPorts[otherNodeID]))
continue
}
select {
case <-ts.pgURL[otherNodeID].set:
joinAddrs = append(joinAddrs, fmt.Sprintf("localhost:%s", ts.pgURL[otherNodeID].u.Port()))
default:
// If the other node hasn't started yet, don't add the join arg.
}
}
joinArg := fmt.Sprintf("--join=%s", strings.Join(joinAddrs, ","))

args := ts.nodes[i].startCmdArgs
if len(ts.nodes) > 1 {
if len(joinAddrs) == 0 {
// The start command always requires a --join arg, so we fake one
// if we don't have any yet.
joinArg = fmt.Sprintf("--join=localhost:0")
}
args = append(args, joinArg)
}
ts.nodes[i].startCmd = exec.Command(args[0], args[1:]...)

currCmd := ts.nodes[i].startCmd
currCmd.Env = []string{
Expand Down Expand Up @@ -86,7 +119,7 @@ func (ts *testServerImpl) StartNode(i int) error {
log.Printf("executing: %s", currCmd)
err := currCmd.Start()
if currCmd.Process != nil {
log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.nodes[i].startCmdArgs, " "))
log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(args, " "))
}
if err != nil {
log.Print(err.Error())
Expand Down

0 comments on commit 0361cb3

Please sign in to comment.