Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix port allocation race condition during tests. #215

Merged
merged 5 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencensus-integrations/redigo v2.0.1+incompatible
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.8.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/prometheus/procfs v0.0.0-20190403104016-ea9eea638872 // indirect
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
12 changes: 11 additions & 1 deletion internal/app/mmforc/mmforc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"time"

"github.com/GoogleCloudPlatform/open-match/internal/port"

"github.com/GoogleCloudPlatform/open-match/config"
"github.com/GoogleCloudPlatform/open-match/internal/logging"
"github.com/GoogleCloudPlatform/open-match/internal/metrics"
Expand Down Expand Up @@ -106,7 +108,15 @@ func initializeApplication() (config.View, error) {
ocMmforcViews := DefaultMmforcViews // mmforc OpenCensus views.
ocMmforcViews = append(ocMmforcViews, redigometrics.ObservabilityMetricViews...) // redis OpenCensus views.
mmforcLog.WithFields(log.Fields{"viewscount": len(ocMmforcViews)}).Info("Loaded OpenCensus views")
metrics.ConfigureOpenCensusPrometheusExporter(cfg, ocMmforcViews)

port, err := port.PortFromNumber(cfg.GetInt("metrics.port"))
if err != nil {
mmforcLog.WithFields(log.Fields{
"error": err.Error(),
}).Error("Unable to create metrics TCP listener")
return nil, err
}
metrics.ConfigureOpenCensusPrometheusExporter(port, cfg, ocMmforcViews)
return cfg, nil
}

Expand Down
18 changes: 13 additions & 5 deletions internal/metrics/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"

"github.com/GoogleCloudPlatform/open-match/config"
"github.com/GoogleCloudPlatform/open-match/internal/port"
log "github.com/sirupsen/logrus"

"go.opencensus.io/exporter/prometheus"
Expand All @@ -46,10 +46,10 @@ var (
// by Promethus for metrics gathering. The calling code can select any views
// it wants to register, from any number of libraries, and pass them in as an
// array.
func ConfigureOpenCensusPrometheusExporter(cfg config.View, views []*view.View) {
func ConfigureOpenCensusPrometheusExporter(p *port.Port, cfg config.View, views []*view.View) {

//var infoCtx, err = tag.New(context.Background(), tag.Insert(KeySeverity, "info"))
metricsPort := cfg.GetInt("metrics.port")
metricsPort := p.Number()
metricsEP := cfg.GetString("metrics.endpoint")
metricsRP := cfg.GetInt("metrics.reportingPeriod")

Expand All @@ -72,7 +72,7 @@ func ConfigureOpenCensusPrometheusExporter(cfg config.View, views []*view.View)
// Change the frequency of updates to the metrics endpoint
view.SetReportingPeriod(time.Duration(metricsRP) * time.Second)
mhLog.WithFields(log.Fields{
"port": metricsPort,
"port": p.Number(),
"endpoint": metricsEP,
"retentionPeriod": metricsRP,
}).Info("Opencensus measurement serving to Prometheus configured")
Expand All @@ -85,7 +85,15 @@ func ConfigureOpenCensusPrometheusExporter(cfg config.View, views []*view.View)
"port": metricsPort,
"endpoint": metricsEP,
}).Info("Attempting to start http server for OpenCensus metrics on localhost")
err := http.ListenAndServe(":"+strconv.Itoa(metricsPort), mux)
listener, err := p.Obtain()
if err != nil {
mhLog.WithFields(log.Fields{
"error": err,
"port": metricsPort,
"endpoint": metricsEP,
}).Fatal("Failed to run Prometheus endpoint")
}
err = http.Serve(listener, mux)
if err != nil {
mhLog.WithFields(log.Fields{
"error": err,
Expand Down
65 changes: 65 additions & 0 deletions internal/port/port.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package port

import (
"fmt"
"net"

"github.com/pkg/errors"
)

// Port is a holder for a TCP port.
type Port struct {
number int
listener net.Listener
}

// Obtain returns the TCP listener. This method can only be called once.
func (p *Port) Obtain() (net.Listener, error) {
listener := p.listener
p.listener = nil
if listener == nil {
return nil, errors.WithStack(fmt.Errorf("cannot Obtain() listener for %d because already handed off", p.number))
}
return listener, nil
}

// String returns the port number as a string.
/*
func (p *Port) String() string {
return fmt.Sprintf("%d", p.number)
}
*/
// Number returns the port number.
func (p *Port) Number() int {
return p.number
}

// Close shutsdown the TCP listener.
func (p *Port) Close() error {
if p.listener != nil {
return p.listener.Close()
}
return nil
}

// PortFromNumber opens a TCP based on the port number provided.
func PortFromNumber(port int) (*Port, error) {
addr := "0.0.0.0"
conn, err := net.Listen("tcp", addr)
jeremyje marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
fmt.Printf("SELECTPORT %d\n", port)
return &Port{
number: port,
listener: conn,
}, nil
}
return nil, err
}

// CreatePort creates a Port object from TCP listener.
func CreatePort(port int, listener net.Listener) *Port {
return &Port{
number: port,
listener: listener,
}
}
50 changes: 50 additions & 0 deletions internal/port/testing/nextport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package testing

import (
"fmt"
"net"
"sync/atomic"

"github.com/GoogleCloudPlatform/open-match/internal/port"
)

const (
firstPort = 10000
lastPort = 60000
maxAttempts = 1000
)

var (
currentPort = int32(firstPort)
)

// MustPort finds the next available port to open for TCP connections, used in tests to make them isolated.
func MustPort() *port.Port {
port, err := nextPort()
if err != nil {
panic(err)
}
return port
}

// NextPort finds the next available port to open for TCP connections, used in tests to make them isolated.
func nextPort() (*port.Port, error) {
success := false
attempt := 0
for !success && attempt <= maxAttempts {
p := int(atomic.AddInt32(&currentPort, 1))
if p > lastPort {
atomic.CompareAndSwapInt32(&currentPort, int32(p), firstPort)
} else {
addr := fmt.Sprintf(":%d", p)
conn, err := net.Listen("tcp", addr)

if err == nil {
fmt.Printf("SELECTPORT %d\n", p)
return port.CreatePort(p, conn), nil
}
}
attempt++
}
return nil, fmt.Errorf("cannot find open port exhausted %d attempts", maxAttempts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ func TestNextPort(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

port, err := NextPort()
port, err := nextPort()
defer port.Close()
if err != nil {
t.Errorf("%s had error, %s", testName, err)
}
if !(firstPort <= port && port <= lastPort) {
if !(firstPort <= port.Number() && port.Number() <= lastPort) {
t.Errorf("Expected %d <= %d <= %d, port is out of range.", firstPort, port, lastPort)
}
})
Expand Down
16 changes: 14 additions & 2 deletions internal/serving/construction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/GoogleCloudPlatform/open-match/config"
"github.com/GoogleCloudPlatform/open-match/internal/logging"
"github.com/GoogleCloudPlatform/open-match/internal/metrics"
"github.com/GoogleCloudPlatform/open-match/internal/port"
redishelpers "github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis"
"github.com/opencensus-integrations/redigo/redis"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -67,7 +68,12 @@ func NewMulti(paramsList []*ServerParams) (*OpenMatchServer, error) {
ocServerViews = append(ocServerViews, config.CfgVarCountView) // config loader view.
ocServerViews = append(ocServerViews, redis.ObservabilityMetricViews...) // redis OpenCensus views.
logger.WithFields(log.Fields{"viewscount": len(ocServerViews)}).Info("Loaded OpenCensus views")
metrics.ConfigureOpenCensusPrometheusExporter(cfg, ocServerViews)
metricsPort, err := port.PortFromNumber(cfg.GetInt("metrics.port"))
if err != nil {
logger.Fatal(err)
return nil, err
}
metrics.ConfigureOpenCensusPrometheusExporter(metricsPort, cfg, ocServerViews)

// Connect to redis
pool, err := redishelpers.ConnectionPool(cfg)
Expand All @@ -77,7 +83,13 @@ func NewMulti(paramsList []*ServerParams) (*OpenMatchServer, error) {
}

// Instantiate the gRPC server with the bindings we've made.
grpcServer := NewGrpcServer(cfg.GetInt(paramsList[0].PortConfigName), logger)
port, err := port.PortFromNumber(cfg.GetInt(paramsList[0].PortConfigName))
if err != nil {
logger.Fatal(err)
return nil, err
}

grpcServer := NewGrpcServer(port, logger)

omServer := &OpenMatchServer{
GrpcServer: grpcServer,
Expand Down
14 changes: 7 additions & 7 deletions internal/serving/grpcserver.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package serving

import (
"fmt"
"net"

"github.com/GoogleCloudPlatform/open-match/internal/port"
log "github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"

Expand All @@ -12,7 +12,7 @@ import (

// GrpcWrapper is a decoration around the standard GRPC server that sets up a bunch of things common to Open Match servers.
type GrpcWrapper struct {
port int
port *port.Port
handlerFuncs []func(*grpc.Server)
server *grpc.Server
ln net.Listener
Expand All @@ -21,7 +21,7 @@ type GrpcWrapper struct {
}

// NewGrpcServer creates a new GrpcWrapper.
func NewGrpcServer(port int, logger *log.Entry) *GrpcWrapper {
func NewGrpcServer(port *port.Port, logger *log.Entry) *GrpcWrapper {
return &GrpcWrapper{
port: port,
logger: logger,
Expand All @@ -39,17 +39,17 @@ func (gw *GrpcWrapper) Start() error {
if gw.ln != nil {
return nil
}
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", gw.port))
ln, err := gw.port.Obtain()
if err != nil {
gw.logger.WithFields(log.Fields{
"error": err.Error(),
"port": gw.port,
"port": gw.port.Number(),
}).Error("net.Listen() error")
return err
}
gw.ln = ln

gw.logger.WithFields(log.Fields{"port": gw.port}).Info("TCP net listener initialized")
gw.logger.WithFields(log.Fields{"port": gw.port.Number()}).Info("TCP net listener initialized")

server := grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
for _, handlerFunc := range gw.handlerFuncs {
Expand All @@ -59,7 +59,7 @@ func (gw *GrpcWrapper) Start() error {
gw.grpcAwaiter = make(chan error)

go func() {
gw.logger.Infof("Serving gRPC on :%d", gw.port)
gw.logger.Infof("Serving gRPC on :%d", gw.port.Number())
err := gw.server.Serve(ln)
gw.grpcAwaiter <- err
if err != nil {
Expand Down
18 changes: 6 additions & 12 deletions internal/serving/testing/minimatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/GoogleCloudPlatform/open-match/internal/logging"
"github.com/GoogleCloudPlatform/open-match/internal/metrics"
"github.com/GoogleCloudPlatform/open-match/internal/pb"
portTesting "github.com/GoogleCloudPlatform/open-match/internal/port/testing"
"github.com/GoogleCloudPlatform/open-match/internal/serving"
redishelpers "github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis"
"github.com/alicebob/miniredis"
Expand Down Expand Up @@ -90,12 +91,9 @@ func createOpenMatchServer(paramsList []*serving.ServerParams) (*MiniMatchServer
cfg.Set("logging.format", "text")
cfg.Set("logging.source", true)

promPort, err := NextPort()
if err != nil {
return nil, fmt.Errorf("cannot create port for prometheus, %s", err)
}
promPort := portTesting.MustPort()

cfg.Set("metrics.port", promPort)
cfg.Set("metrics.port", promPort.Number())
cfg.Set("metrics.endpoint", "/metrics")
cfg.Set("metrics.reportingPeriod", "5s")

Expand All @@ -116,7 +114,7 @@ func createOpenMatchServer(paramsList []*serving.ServerParams) (*MiniMatchServer
ocServerViews = append(ocServerViews, config.CfgVarCountView) // config loader view.
ocServerViews = append(ocServerViews, redis.ObservabilityMetricViews...) // redis OpenCensus views.
logger.WithFields(log.Fields{"viewscount": len(ocServerViews)}).Info("Loaded OpenCensus views")
metrics.ConfigureOpenCensusPrometheusExporter(cfg, ocServerViews)
metrics.ConfigureOpenCensusPrometheusExporter(promPort, cfg, ocServerViews)

// Connect to redis
mredis, err := miniredis.Run()
Expand Down Expand Up @@ -152,13 +150,9 @@ func createOpenMatchServer(paramsList []*serving.ServerParams) (*MiniMatchServer
return nil, err
}

grpcPort, err := NextPort()
if err != nil {
closeOnFailure()
return nil, fmt.Errorf("cannot provision a port for gRPC, %s", err)
}
grpcPort := portTesting.MustPort()
for _, params := range paramsList {
cfg.Set(params.PortConfigName, grpcPort)
cfg.Set(params.PortConfigName, grpcPort.Number())
}

// Instantiate the gRPC server with the connections we've made
Expand Down