From 58c17f696263fc4a35ef5de198f99aa1717e7b87 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 21 Jun 2021 20:11:08 -0700 Subject: [PATCH] :seedling: addr.Suggest should lock a file instead of memory Envtest is often running in parallel when using go test, which spins up multiple indipendent go test processes that cannot talk to each other. The address suggestion code, mostly used to find an open port, can cause port collisions and a race condition between different envtests running at the same time. This change switches the internal memory to use a file based system that creates a file. Signed-off-by: Vince Prignano --- go.mod | 1 + hack/check-everything.sh | 10 ++- hack/verify.sh | 8 +- pkg/internal/flock/doc.go | 21 +++++ pkg/internal/flock/flock_other.go | 24 ++++++ pkg/internal/flock/flock_unix.go | 35 ++++++++ pkg/internal/testing/addr/manager.go | 103 +++++++++++++++--------- pkg/internal/testing/process/process.go | 10 ++- 8 files changed, 166 insertions(+), 46 deletions(-) create mode 100644 pkg/internal/flock/doc.go create mode 100644 pkg/internal/flock/flock_other.go create mode 100644 pkg/internal/flock/flock_unix.go diff --git a/go.mod b/go.mod index 1423d76de0..0ce3b6c132 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/prometheus/client_model v0.2.0 go.uber.org/goleak v1.1.10 go.uber.org/zap v1.17.0 + golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 gomodules.xyz/jsonpatch/v2 v2.2.0 google.golang.org/appengine v1.6.7 // indirect diff --git a/hack/check-everything.sh b/hack/check-everything.sh index b6d3472bcc..08fadee7e0 100755 --- a/hack/check-everything.sh +++ b/hack/check-everything.sh @@ -24,9 +24,11 @@ source ${hack_dir}/common.sh tmp_root=/tmp kb_root_dir=$tmp_root/kubebuilder -ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"} +# Run verification scripts. +${hack_dir}/verify.sh -# set up envtest tools if necessary +# Envtest. +ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"} header_text "installing envtest tools@${ENVTEST_K8S_VERSION} with setup-envtest if necessary" tmp_bin=/tmp/cr-tests-bin @@ -35,9 +37,9 @@ tmp_bin=/tmp/cr-tests-bin cd ${hack_dir}/../tools/setup-envtest GOBIN=${tmp_bin} go install . ) -source <(${tmp_bin}/setup-envtest use --use-env -p env ${ENVTEST_K8S_VERSION}) +export KUBEBUILDER_ASSETS="$(${tmp_bin}/setup-envtest use --use-env -p path "${ENVTEST_K8S_VERSION}")" -${hack_dir}/verify.sh +# Run tests. ${hack_dir}/test-all.sh header_text "confirming examples compile (via go install)" diff --git a/hack/verify.sh b/hack/verify.sh index e44299797c..85006e3f06 100755 --- a/hack/verify.sh +++ b/hack/verify.sh @@ -27,5 +27,9 @@ make generate header_text "running golangci-lint" make lint -header_text "verifying modules" -make modules verify-modules +# Only run module verification in CI, otherwise updating +# go module locally (which is a valid operation) causes `make test` to fail. +if [[ -n ${CI} ]]; then + header_text "verifying modules" + make modules verify-modules +fi diff --git a/pkg/internal/flock/doc.go b/pkg/internal/flock/doc.go new file mode 100644 index 0000000000..11e39823ed --- /dev/null +++ b/pkg/internal/flock/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid +// importing k8s.io/kubernetes as a dependency. +// +// Provides file locking functionalities on unix systems. +package flock diff --git a/pkg/internal/flock/flock_other.go b/pkg/internal/flock/flock_other.go new file mode 100644 index 0000000000..069a5b3a2c --- /dev/null +++ b/pkg/internal/flock/flock_other.go @@ -0,0 +1,24 @@ +// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flock + +// Acquire is not implemented on non-unix systems. +func Acquire(path string) error { + return nil +} diff --git a/pkg/internal/flock/flock_unix.go b/pkg/internal/flock/flock_unix.go new file mode 100644 index 0000000000..3dae621b73 --- /dev/null +++ b/pkg/internal/flock/flock_unix.go @@ -0,0 +1,35 @@ +// +build linux darwin freebsd openbsd netbsd dragonfly + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flock + +import "golang.org/x/sys/unix" + +// Acquire acquires a lock on a file for the duration of the process. This method +// is reentrant. +func Acquire(path string) error { + fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600) + if err != nil { + return err + } + + // We don't need to close the fd since we should hold + // it until the process exits. + + return unix.Flock(fd, unix.LOCK_EX) +} diff --git a/pkg/internal/testing/addr/manager.go b/pkg/internal/testing/addr/manager.go index 15f36fe2d0..2326af1569 100644 --- a/pkg/internal/testing/addr/manager.go +++ b/pkg/internal/testing/addr/manager.go @@ -18,78 +18,109 @@ package addr import ( "fmt" + "io/fs" "net" - "sync" + "os" + "path/filepath" + "strings" "time" + + "sigs.k8s.io/controller-runtime/pkg/internal/flock" ) // TODO(directxman12): interface / release functionality for external port managers const ( - portReserveTime = 1 * time.Minute + portReserveTime = 10 * time.Minute portConflictRetry = 100 + portFilePrefix = "port-" +) + +var ( + cacheDir string ) -type portCache struct { - lock sync.Mutex - ports map[int]time.Time +func init() { + baseDir, err := os.UserCacheDir() + if err != nil { + baseDir = os.TempDir() + } + cacheDir = filepath.Join(baseDir, "kubebuilder-envtest") + if err := os.MkdirAll(cacheDir, 0750); err != nil { + panic(err) + } } -func (c *portCache) add(port int) bool { - c.lock.Lock() - defer c.lock.Unlock() - // remove outdated port - for p, t := range c.ports { - if time.Since(t) > portReserveTime { - delete(c.ports, p) +type portCache struct{} + +func (c *portCache) add(port int) (bool, error) { + // Remove outdated ports. + if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) { + return nil + } + info, err := d.Info() + if err != nil { + return err } + if time.Since(info.ModTime()) > portReserveTime { + if err := os.Remove(filepath.Join(cacheDir, path)); err != nil { + return err + } + } + return nil + }); err != nil { + return false, err } - // try allocating new port - if _, ok := c.ports[port]; ok { - return false + // Try allocating new port, by acquiring a file. + if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) { + return false, nil + } else if err != nil { + return false, err } - c.ports[port] = time.Now() - return true + return true, nil } -var cache = &portCache{ - ports: make(map[int]time.Time), -} +var cache = &portCache{} -func suggest(listenHost string) (port int, resolvedHost string, err error) { +func suggest(listenHost string) (int, string, error) { if listenHost == "" { listenHost = "localhost" } addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0")) if err != nil { - return + return -1, "", err } l, err := net.ListenTCP("tcp", addr) if err != nil { - return + return -1, "", err + } + if err := l.Close(); err != nil { + return -1, "", err } - port = l.Addr().(*net.TCPAddr).Port - defer func() { - err = l.Close() - }() - resolvedHost = addr.IP.String() - return + return l.Addr().(*net.TCPAddr).Port, + addr.IP.String(), + nil } // Suggest suggests an address a process can listen on. It returns // a tuple consisting of a free port and the hostname resolved to its IP. // It makes sure that new port allocated does not conflict with old ports // allocated within 1 minute. -func Suggest(listenHost string) (port int, resolvedHost string, err error) { +func Suggest(listenHost string) (int, string, error) { for i := 0; i < portConflictRetry; i++ { - port, resolvedHost, err = suggest(listenHost) + port, resolvedHost, err := suggest(listenHost) if err != nil { - return + return -1, "", err } - if cache.add(port) { - return + if ok, err := cache.add(port); ok { + return port, resolvedHost, nil + } else if err != nil { + return -1, "", err } } - err = fmt.Errorf("no free ports found after %d retries", portConflictRetry) - return + return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry) } diff --git a/pkg/internal/testing/process/process.go b/pkg/internal/testing/process/process.go index 5a0f7b8565..4675f9b947 100644 --- a/pkg/internal/testing/process/process.go +++ b/pkg/internal/testing/process/process.go @@ -248,6 +248,12 @@ func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh // Stop stops this process gracefully, waits for its termination, and cleans up // the CertDir if necessary. func (ps *State) Stop() error { + // Always clear the directory if we need to. + defer func() { + if ps.DirNeedsCleaning { + _ = os.RemoveAll(ps.Dir) + } + }() if ps.Cmd == nil { return nil } @@ -267,9 +273,5 @@ func (ps *State) Stop() error { return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path)) } ps.ready = false - if ps.DirNeedsCleaning { - return os.RemoveAll(ps.Dir) - } - return nil }