Skip to content

Commit

Permalink
fix: avoid closing and re-opening port of api server settings change (#…
Browse files Browse the repository at this point in the history
…9778)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>
  • Loading branch information
alexmt committed Jun 24, 2022
1 parent e715e08 commit 82726fc
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 96 deletions.
7 changes: 4 additions & 3 deletions cmd/argocd-server/commands/argocd_server.go
Expand Up @@ -152,7 +152,9 @@ func NewCommand() *cobra.Command {
stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")

argocd := server.NewServer(context.Background(), argoCDOpts)
lns, err := argocd.Listen()
errors.CheckError(err)
for {
var closer func()
ctx := context.Background()
Expand All @@ -163,8 +165,7 @@ func NewCommand() *cobra.Command {
log.Fatalf("failed to initialize tracing: %v", err)
}
}
argocd := server.NewServer(ctx, argoCDOpts)
argocd.Run(ctx, listenPort, metricsPort)
argocd.Run(ctx, lns)
cancel()
if closer != nil {
closer()
Expand Down
6 changes: 5 additions & 1 deletion cmd/argocd/commands/headless/headless.go
Expand Up @@ -216,7 +216,11 @@ func StartLocalServer(clientOpts *apiclient.ClientOptions, ctxStr string, port *
RepoClientset: &forwardRepoClientset{namespace: namespace, context: ctxStr},
})

go srv.Run(ctx, *port, 0)
lns, err := srv.Listen()
if err != nil {
return err
}
go srv.Run(ctx, lns)
clientOpts.ServerAddr = fmt.Sprintf("%s:%d", *address, *port)
clientOpts.PlainText = true
if !cache2.WaitForCacheSync(ctx.Done(), srv.Initialized) {
Expand Down
163 changes: 103 additions & 60 deletions server/server.go
Expand Up @@ -282,22 +282,102 @@ func (a *ArgoCDServer) healthCheck(r *http.Request) error {
return nil
}

type Listeners struct {
Main net.Listener
Metrics net.Listener
GatewayConn *grpc.ClientConn
}

func (l *Listeners) Close() error {
if l.Main != nil {
if err := l.Main.Close(); err != nil {
return err
}
l.Main = nil
}
if l.Metrics != nil {
if err := l.Metrics.Close(); err != nil {
return err
}
l.Metrics = nil
}
if l.GatewayConn != nil {
if err := l.GatewayConn.Close(); err != nil {
return err
}
l.GatewayConn = nil
}
return nil
}

func startListener(host string, port int) (net.Listener, error) {
var conn net.Listener
var realErr error
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if realErr != nil {
return false, nil
}
return true, nil
})
return conn, realErr
}

func (a *ArgoCDServer) Listen() (*Listeners, error) {
mainLn, err := startListener(a.ListenHost, a.ListenPort)
if err != nil {
return nil, err
}
metricsLn, err := startListener(a.ListenHost, a.MetricsPort)
if err != nil {
io.Close(mainLn)
return nil, err
}
var dOpts []grpc.DialOption
dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize)))
dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version)))
dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()))
dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if a.useTLS() {
// The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS.
// grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost,
// so we need to supply the same certificates to establish the connections that a normal,
// external gRPC client would need.
tlsConfig := a.settings.TLSConfig()
if a.TLSConfigCustomizer != nil {
a.TLSConfigCustomizer(tlsConfig)
}
tlsConfig.InsecureSkipVerify = true
dCreds := credentials.NewTLS(tlsConfig)
dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds))
} else {
dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", a.ListenPort), dOpts...)
if err != nil {
io.Close(mainLn)
io.Close(metricsLn)
return nil, err
}
return &Listeners{Main: mainLn, Metrics: metricsLn, GatewayConn: conn}, nil
}

// Run runs the API Server
// We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types.
// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of
// golang/protobuf).
func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
a.userStateStorage.Init(ctx)

grpcS, appResourceTreeFn := a.newGRPCServer()
grpcWebS := grpcweb.WrapServer(grpcS)
var httpS *http.Server
var httpsS *http.Server
if a.useTLS() {
httpS = newRedirectServer(port, a.RootPath)
httpsS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn)
httpS = newRedirectServer(a.ListenPort, a.RootPath)
httpsS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn)
} else {
httpS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn)
httpS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn)
}
if a.RootPath != "" {
httpS.Handler = withRootPath(httpS.Handler, a)
Expand All @@ -311,26 +391,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
httpsS.Handler = &bug21955Workaround{handler: httpsS.Handler}
}

metricsServ := metrics.NewMetricsServer(a.ListenHost, metricsPort)
metricsServ := metrics.NewMetricsServer(a.ListenHost, a.MetricsPort)
if a.RedisClient != nil {
cacheutil.CollectMetrics(a.RedisClient, metricsServ)
}

// Start listener
var conn net.Listener
var realErr error
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", a.ListenHost, port))
if realErr != nil {
a.log.Warnf("failed listen: %v", realErr)
return false, nil
}
return true, nil
})
errors.CheckError(realErr)

// CMux is used to support servicing gRPC and HTTP1.1+JSON on the same port
tcpm := cmux.New(conn)
tcpm := cmux.New(listeners.Main)
var tlsm cmux.CMux
var grpcL net.Listener
var httpL net.Listener
Expand Down Expand Up @@ -360,7 +427,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {

// Start the muxed listeners for our servers
log.Infof("argocd %s serving on port %d (url: %s, tls: %v, namespace: %s, sso: %v)",
common.GetVersion(), port, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured())
common.GetVersion(), a.ListenPort, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured())

go a.projInformer.Run(ctx.Done())
go a.appInformer.Run(ctx.Done())
Expand All @@ -374,17 +441,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
go a.watchSettings()
go a.rbacPolicyLoader(ctx)
go func() { a.checkServeErr("tcpm", tcpm.Serve()) }()
go func() { a.checkServeErr("metrics", metricsServ.ListenAndServe()) }()
go func() { a.checkServeErr("metrics", metricsServ.Serve(listeners.Metrics)) }()
if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) {
log.Fatal("Timed out waiting for project cache to sync")
}

a.stopCh = make(chan struct{})
<-a.stopCh
errors.CheckError(conn.Close())
if err := metricsServ.Shutdown(ctx); err != nil {
log.Fatalf("Failed to gracefully shutdown metrics server: %v", err)
}
}

func (a *ArgoCDServer) Initialized() bool {
Expand Down Expand Up @@ -702,7 +765,7 @@ func compressHandler(handler http.Handler) http.Handler {

// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented
// using grpc-gateway as a proxy to the gRPC server.
func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn) *http.Server {
func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn, conn *grpc.ClientConn) *http.Server {
endpoint := fmt.Sprintf("localhost:%d", port)
mux := http.NewServeMux()
httpS := http.Server{
Expand All @@ -718,26 +781,6 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl
},
},
}
var dOpts []grpc.DialOption
dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize)))
dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version)))
dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()))
dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if a.useTLS() {
// The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS.
// grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost,
// so we need to supply the same certificates to establish the connections that a normal,
// external gRPC client would need.
tlsConfig := a.settings.TLSConfig()
if a.TLSConfigCustomizer != nil {
a.TLSConfigCustomizer(tlsConfig)
}
tlsConfig.InsecureSkipVerify = true
dCreds := credentials.NewTLS(tlsConfig)
dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds))
} else {
dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

// HTTP 1.1+JSON Server
// grpc-ecosystem/grpc-gateway is used to proxy HTTP requests to the corresponding gRPC call
Expand Down Expand Up @@ -790,17 +833,17 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl
terminalHandler.ServeHTTP(writer, request)
})

mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandler, ctx, gwmux, conn)

// Swagger UI
swagger.ServeSwaggerUI(mux, assets.SwaggerJSON, "/swagger-ui", a.RootPath)
Expand Down Expand Up @@ -968,11 +1011,11 @@ func isMainJsBundle(url *url.URL) bool {
return mainJsBundleRegex.Match([]byte(filename))
}

type registerFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
type registerFunc func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error

// mustRegisterGWHandler is a convenience function to register a gateway handler
func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) {
err := register(ctx, mux, endpoint, opts)
func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) {
err := register(ctx, mux, conn)
if err != nil {
panic(err)
}
Expand Down
48 changes: 16 additions & 32 deletions server/server_norace_test.go
Expand Up @@ -29,20 +29,16 @@ func TestUserAgent(t *testing.T) {

s, closer := fakeServer()
defer closer()
lns, err := s.Listen()
assert.NoError(t, err)

cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

type testData struct {
userAgent string
errorMsg string
Expand Down Expand Up @@ -72,7 +68,7 @@ func TestUserAgent(t *testing.T) {

for _, test := range tests {
opts := apiclient.ClientOptions{
ServerAddr: fmt.Sprintf("localhost:%d", port),
ServerAddr: fmt.Sprintf("localhost:%d", s.ListenPort),
PlainText: true,
UserAgent: test.userAgent,
}
Expand All @@ -99,25 +95,20 @@ func Test_StaticHeaders(t *testing.T) {
{
s, closer := fakeServer()
defer closer()
lns, err := s.Listen()
assert.NoError(t, err)
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand All @@ -134,23 +125,18 @@ func Test_StaticHeaders(t *testing.T) {
s.ContentSecurityPolicy = "frame-ancestors 'none';"
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
lns, err := s.Listen()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand All @@ -167,23 +153,21 @@ func Test_StaticHeaders(t *testing.T) {
s.ContentSecurityPolicy = ""
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
lns, err := s.Listen()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", s.ListenPort), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand Down

0 comments on commit 82726fc

Please sign in to comment.