Skip to content

Commit

Permalink
get-free-port: prevent duplicate ports on Linux (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh committed May 17, 2024
1 parent 83d4685 commit 350cb2f
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 76 deletions.
19 changes: 18 additions & 1 deletion temporalcli/commands.server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
} else if err := opts.LogLevel.UnmarshalText([]byte(logLevel)); err != nil {
return fmt.Errorf("invalid log level %q: %w", logLevel, err)
}
if err := devserver.CheckPortFree(opts.FrontendIP, opts.FrontendPort); err != nil {
return fmt.Errorf("can't set frontend port %d: %w", opts.FrontendPort, err)
}
if err := devserver.CheckPortFree(opts.FrontendIP, opts.FrontendHTTPPort); err != nil {
return fmt.Errorf("can't set frontend HTTP port %d: %w", opts.FrontendHTTPPort, err)
}
// Setup UI
if !t.Headless {
opts.UIIP, opts.UIPort = t.Ip, t.UiPort
Expand All @@ -48,6 +54,13 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
if opts.UIPort == 0 {
opts.UIPort = t.Port + 1000
if err := devserver.CheckPortFree(opts.UIIP, opts.UIPort); err != nil {
return fmt.Errorf("can't use default UI port %d (%d + 1000): %w", opts.UIPort, t.Port, err)
}
} else {
if err := devserver.CheckPortFree(opts.UIIP, t.Port); err != nil {
return fmt.Errorf("can't set UI port %d: %w", opts.UIPort, err)
}
}
opts.UIAssetPath, opts.UICodecEndpoint = t.UiAssetPath, t.UiCodecEndpoint
}
Expand Down Expand Up @@ -87,7 +100,11 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
// Grab a free port for metrics ahead-of-time so we know what port is selected
if opts.MetricsPort == 0 {
opts.MetricsPort = devserver.MustGetFreePort()
opts.MetricsPort = devserver.MustGetFreePort(t.Ip)
} else {
if err := devserver.CheckPortFree(t.Ip, opts.MetricsPort); err != nil {
return fmt.Errorf("can't set metrics port %d: %w", opts.MetricsPort, err)
}
}

// Start, wait for context complete, then stop
Expand Down
85 changes: 83 additions & 2 deletions temporalcli/commands.server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package temporalcli_test
import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -21,7 +23,7 @@ func TestServer_StartDev_Simple(t *testing.T) {
defer h.Close()

// Start in background, then wait for client to be able to connect
port := strconv.Itoa(devserver.MustGetFreePort())
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
resCh := make(chan *CommandResult, 1)
// TODO(cretz): Remove --headless when
// https://github.com/temporalio/ui/issues/1773 fixed
Expand All @@ -38,7 +40,7 @@ func TestServer_StartDev_Simple(t *testing.T) {
}
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port})
require.NoError(t, err)
assert.NoError(t, err)
}, 3*time.Second, 200*time.Millisecond)
defer cl.Close()

Expand All @@ -60,3 +62,82 @@ func TestServer_StartDev_Simple(t *testing.T) {
h.NoError(res.Err)
}
}

func TestServer_StartDev_ConcurrentStarts(t *testing.T) {
startOne := func() {
h := NewCommandHarness(t)
defer h.Close()

// Start in background, then wait for client to be able to connect
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
resCh := make(chan *CommandResult, 1)
go func() {
resCh <- h.Execute("server", "start-dev", "-p", port, "--headless", "--log-level", "never")
}()

// Try to connect for a bit while checking for error
var cl client.Client
h.EventuallyWithT(func(t *assert.CollectT) {
select {
case res := <-resCh:
require.NoError(t, res.Err)
require.Fail(t, "got early server result")
default:
}
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port, Logger: testLogger{t: h.t}})
assert.NoError(t, err)
}, 3*time.Second, 200*time.Millisecond)
defer cl.Close()

// Send an interrupt by cancelling context
h.CancelContext()

// FIXME: We should technically wait for server cleanup, but this is
// slowing down the test considerably, presumably due to the issue fixed
// in https://github.com/temporalio/temporal/pull/5459. Uncomment the
// following code when the server dependency is updated to 1.24.0.
//
// select {
// case <-time.After(20 * time.Second):
// h.Fail("didn't cleanup after 20 seconds")
// case res := <-resCh:
// h.NoError(res.Err)
// }
}

// Start 80 dev server instances, with 8 concurrent executions
instanceCounter := atomic.Int32{}
instanceCounter.Store(80)
wg := &sync.WaitGroup{}
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
for instanceCounter.Add(-1) >= 0 {
startOne()
}
wg.Done()
}()
}
wg.Wait()
}

type testLogger struct {
t *testing.T
}

func (l testLogger) Debug(msg string, keysAndValues ...interface{}) {
l.t.Logf("DEBUG: "+msg, keysAndValues...)
}

func (l testLogger) Info(msg string, keysAndValues ...interface{}) {
l.t.Logf("INFO: "+msg, keysAndValues...)
}

func (l testLogger) Warn(msg string, keysAndValues ...interface{}) {
l.t.Logf("WARN: "+msg, keysAndValues...)
}

func (l testLogger) Error(msg string, keysAndValues ...interface{}) {
l.t.Logf("ERROR: "+msg, keysAndValues...)
}
2 changes: 1 addition & 1 deletion temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
d.Options.FrontendIP = "127.0.0.1"
}
if d.Options.FrontendPort == 0 {
d.Options.FrontendPort = devserver.MustGetFreePort()
d.Options.FrontendPort = devserver.MustGetFreePort(d.Options.FrontendIP)
}
if len(d.Options.Namespaces) == 0 {
d.Options.Namespaces = []string{
Expand Down
135 changes: 73 additions & 62 deletions temporalcli/devserver/freeport.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,95 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Copyright (c) 2021 Datadog, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package devserver

import (
"fmt"
"net"
"runtime"
)

// Modified from https://github.com/phayes/freeport/blob/95f893ade6f232a5f1511d61735d89b1ae2df543/freeport.go

func MustGetFreePort() int {
p := NewPortProvider()
defer p.Close()
return p.MustGetFreePort()
}

func NewPortProvider() *PortProvider {
return &PortProvider{}
}

type PortProvider struct {
listeners []*net.TCPListener
}

// GetFreePort asks the kernel for a free open port that is ready to use.
func (p *PortProvider) GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
// Returns a TCP port that is available to listen on, for the given (local) host.
//
// This works by binding a new TCP socket on port 0, which requests the OS to
// allocate a free port. There is no strict guarantee that the port will remain
// available after this function returns, but it should be safe to assume that
// a given port will not be allocated again to any process on this machine
// within a few seconds.
//
// On Unix-based systems, binding to the port returned by this function requires
// setting the `SO_REUSEADDR` socket option (Go already does that by default,
// but other languages may not); otherwise, the OS may fail with a message such
// as "address already in use". Windows default behavior is already appropriate
// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and
// should not be set (setting it may have unpredictable consequences).
func GetFreePort(host string) (int, error) {
l, err := net.Listen("tcp", host+":0")
if err != nil {
if addr, err = net.ResolveTCPAddr("tcp6", "[::1]:0"); err != nil {
panic(fmt.Sprintf("temporal: failed to get free port: %v", err))
}
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
defer l.Close()
port := l.Addr().(*net.TCPAddr).Port

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
// On Linux and some BSD variants, ephemeral ports are randomized, and may
// consequently repeat within a short time frame after the listenning end
// has been closed. To avoid this, we make a connection to the port, then
// close that connection from the server's side (this is very important),
// which puts the connection in TIME_WAIT state for some time (by default,
// 60s on Linux). While it remains in that state, the OS will not reallocate
// that port number for bind(:0) syscalls, yet we are not prevented from
// explicitly binding to it (thanks to SO_REUSEADDR).
//
// On macOS and Windows, the above technique is not necessary, as the OS
// allocates ephemeral ports sequentially, meaning a port number will only
// be reused after the entire range has been exhausted. Quite the opposite,
// given that these OSes use a significantly smaller range for ephemeral
// ports, making an extra connection just to reserve a port might actually
// be harmful (by hastening ephemeral port exhaustion).
if runtime.GOOS != "darwin" && runtime.GOOS != "windows" {
r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr))
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
c, err := l.Accept()
if err != nil {
return 0, fmt.Errorf("failed to assign a free port: %v", err)
}
// Closing the socket from the server side
c.Close()
defer r.Close()
}

p.listeners = append(p.listeners, l)

return l.Addr().(*net.TCPAddr).Port, nil
return port, nil
}

func (p *PortProvider) MustGetFreePort() int {
port, err := p.GetFreePort()
// Returns a TCP port that is available to listen on, for the given (local)
// host; panics if no port is available.
//
// This works by binding a new TCP socket on port 0, which requests the OS to
// allocate a free port. There is no strict guarantee that the port will remain
// available after this function returns, but it should be safe to assume that
// a given port will not be allocated again to any process on this machine
// within a few seconds.
//
// On Unix-based systems, binding to the port returned by this function requires
// setting the `SO_REUSEADDR` socket option (Go already does that by default,
// but other languages may not); otherwise, the OS may fail with a message such
// as "address already in use". Windows default behavior is already appropriate
// in this regard; on that platform, `SO_REUSEADDR` has a different meaning and
// should not be set (setting it may have unpredictable consequences).
func MustGetFreePort(host string) int {
port, err := GetFreePort(host)
if err != nil {
panic(err)
panic(fmt.Errorf("failed assigning ephemeral port: %w", err))
}
return port
}

func (p *PortProvider) Close() error {
for _, l := range p.listeners {
if err := l.Close(); err != nil {
return err
}
// Asserts that the given TCP port is available to listen on, for the given
// (local) host; return an error if it is not.
func CheckPortFree(host string, port int) error {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
l.Close()
return nil
}
69 changes: 69 additions & 0 deletions temporalcli/devserver/freeport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package devserver_test

import (
"fmt"
"net"
"testing"

"github.com/temporalio/cli/temporalcli/devserver"
)

func TestFreePort_NoDouble(t *testing.T) {
host := "127.0.0.1"
portSet := make(map[int]bool)

for i := 0; i < 2000; i++ {
p, err := devserver.GetFreePort(host)
if err != nil {
t.Fatalf("Error: %s", err)
break
}

if _, exists := portSet[p]; exists {
t.Fatalf("Port %d has been assigned more than once", p)
}

// Add port to the set
portSet[p] = true
}
}

func TestFreePort_CanBindImmediatelySameProcess(t *testing.T) {
host := "127.0.0.1"

for i := 0; i < 500; i++ {
p, err := devserver.GetFreePort(host)
if err != nil {
t.Fatalf("Error: %s", err)
break
}
err = tryListenAndDialOn(host, p)
if err != nil {
t.Fatalf("Error: %s", err)
break
}
}
}

// This function is used as part of unit tests, to ensure that the port
func tryListenAndDialOn(host string, port int) error {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
defer l.Close()

r, err := net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr))
if err != nil {
panic(err)
}
defer r.Close()

c, err := l.Accept()
if err != nil {
panic(err)
}
defer c.Close()

return nil
}

0 comments on commit 350cb2f

Please sign in to comment.