diff --git a/cmd/argocd-server/commands/argocd_server.go b/cmd/argocd-server/commands/argocd_server.go index e7df41a82e5c..a5d2f8c2efdb 100644 --- a/cmd/argocd-server/commands/argocd_server.go +++ b/cmd/argocd-server/commands/argocd_server.go @@ -152,7 +152,9 @@ func NewCommand() *cobra.Command { stats.RegisterStackDumper() stats.StartStatsTicker(10 * time.Minute) stats.RegisterHeapDumper("memprofile") - + argocd := server.NewServer(context.Background(), argoCDOpts) + lns, err := argocd.Listen() + errors.CheckError(err) for { var closer func() ctx := context.Background() @@ -163,8 +165,7 @@ func NewCommand() *cobra.Command { log.Fatalf("failed to initialize tracing: %v", err) } } - argocd := server.NewServer(ctx, argoCDOpts) - argocd.Run(ctx, listenPort, metricsPort) + argocd.Run(ctx, lns) cancel() if closer != nil { closer() diff --git a/cmd/argocd/commands/headless/headless.go b/cmd/argocd/commands/headless/headless.go index 720139109778..3f532d60cc43 100644 --- a/cmd/argocd/commands/headless/headless.go +++ b/cmd/argocd/commands/headless/headless.go @@ -216,7 +216,11 @@ func StartLocalServer(clientOpts *apiclient.ClientOptions, ctxStr string, port * RepoClientset: &forwardRepoClientset{namespace: namespace, context: ctxStr}, }) - go srv.Run(ctx, *port, 0) + lns, err := srv.Listen() + if err != nil { + return err + } + go srv.Run(ctx, lns) clientOpts.ServerAddr = fmt.Sprintf("%s:%d", *address, *port) clientOpts.PlainText = true if !cache2.WaitForCacheSync(ctx.Done(), srv.Initialized) { diff --git a/server/server.go b/server/server.go index 5222244378c5..28485841bb33 100644 --- a/server/server.go +++ b/server/server.go @@ -282,11 +282,91 @@ func (a *ArgoCDServer) healthCheck(r *http.Request) error { return nil } +type Listeners struct { + Main net.Listener + Metrics net.Listener + GatewayConn *grpc.ClientConn +} + +func (l *Listeners) Close() error { + if l.Main != nil { + if err := l.Main.Close(); err != nil { + return err + } + l.Main = nil + } + if l.Metrics != nil { + if err := l.Metrics.Close(); err != nil { + return err + } + l.Metrics = nil + } + if l.GatewayConn != nil { + if err := l.GatewayConn.Close(); err != nil { + return err + } + l.GatewayConn = nil + } + return nil +} + +func startListener(host string, port int) (net.Listener, error) { + var conn net.Listener + var realErr error + _ = wait.ExponentialBackoff(backoff, func() (bool, error) { + conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if realErr != nil { + return false, nil + } + return true, nil + }) + return conn, realErr +} + +func (a *ArgoCDServer) Listen() (*Listeners, error) { + mainLn, err := startListener(a.ListenHost, a.ListenPort) + if err != nil { + return nil, err + } + metricsLn, err := startListener(a.ListenHost, a.MetricsPort) + if err != nil { + io.Close(mainLn) + return nil, err + } + var dOpts []grpc.DialOption + dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize))) + dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version))) + dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())) + dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())) + if a.useTLS() { + // The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS. + // grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost, + // so we need to supply the same certificates to establish the connections that a normal, + // external gRPC client would need. + tlsConfig := a.settings.TLSConfig() + if a.TLSConfigCustomizer != nil { + a.TLSConfigCustomizer(tlsConfig) + } + tlsConfig.InsecureSkipVerify = true + dCreds := credentials.NewTLS(tlsConfig) + dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds)) + } else { + dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", a.ListenPort), dOpts...) + if err != nil { + io.Close(mainLn) + io.Close(metricsLn) + return nil, err + } + return &Listeners{Main: mainLn, Metrics: metricsLn, GatewayConn: conn}, nil +} + // Run runs the API Server // We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types. // k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of // golang/protobuf). -func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { +func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) { a.userStateStorage.Init(ctx) grpcS, appResourceTreeFn := a.newGRPCServer() @@ -294,10 +374,10 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { var httpS *http.Server var httpsS *http.Server if a.useTLS() { - httpS = newRedirectServer(port, a.RootPath) - httpsS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn) + httpS = newRedirectServer(a.ListenPort, a.RootPath) + httpsS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn) } else { - httpS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn) + httpS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn) } if a.RootPath != "" { httpS.Handler = withRootPath(httpS.Handler, a) @@ -311,26 +391,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { httpsS.Handler = &bug21955Workaround{handler: httpsS.Handler} } - metricsServ := metrics.NewMetricsServer(a.ListenHost, metricsPort) + metricsServ := metrics.NewMetricsServer(a.ListenHost, a.MetricsPort) if a.RedisClient != nil { cacheutil.CollectMetrics(a.RedisClient, metricsServ) } - // Start listener - var conn net.Listener - var realErr error - _ = wait.ExponentialBackoff(backoff, func() (bool, error) { - conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", a.ListenHost, port)) - if realErr != nil { - a.log.Warnf("failed listen: %v", realErr) - return false, nil - } - return true, nil - }) - errors.CheckError(realErr) - // CMux is used to support servicing gRPC and HTTP1.1+JSON on the same port - tcpm := cmux.New(conn) + tcpm := cmux.New(listeners.Main) var tlsm cmux.CMux var grpcL net.Listener var httpL net.Listener @@ -360,7 +427,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { // Start the muxed listeners for our servers log.Infof("argocd %s serving on port %d (url: %s, tls: %v, namespace: %s, sso: %v)", - common.GetVersion(), port, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured()) + common.GetVersion(), a.ListenPort, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured()) go a.projInformer.Run(ctx.Done()) go a.appInformer.Run(ctx.Done()) @@ -374,17 +441,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { go a.watchSettings() go a.rbacPolicyLoader(ctx) go func() { a.checkServeErr("tcpm", tcpm.Serve()) }() - go func() { a.checkServeErr("metrics", metricsServ.ListenAndServe()) }() + go func() { a.checkServeErr("metrics", metricsServ.Serve(listeners.Metrics)) }() if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { log.Fatal("Timed out waiting for project cache to sync") } a.stopCh = make(chan struct{}) <-a.stopCh - errors.CheckError(conn.Close()) - if err := metricsServ.Shutdown(ctx); err != nil { - log.Fatalf("Failed to gracefully shutdown metrics server: %v", err) - } } func (a *ArgoCDServer) Initialized() bool { @@ -702,7 +765,7 @@ func compressHandler(handler http.Handler) http.Handler { // newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented // using grpc-gateway as a proxy to the gRPC server. -func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn) *http.Server { +func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn, conn *grpc.ClientConn) *http.Server { endpoint := fmt.Sprintf("localhost:%d", port) mux := http.NewServeMux() httpS := http.Server{ @@ -718,26 +781,6 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl }, }, } - var dOpts []grpc.DialOption - dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize))) - dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version))) - dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())) - dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())) - if a.useTLS() { - // The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS. - // grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost, - // so we need to supply the same certificates to establish the connections that a normal, - // external gRPC client would need. - tlsConfig := a.settings.TLSConfig() - if a.TLSConfigCustomizer != nil { - a.TLSConfigCustomizer(tlsConfig) - } - tlsConfig.InsecureSkipVerify = true - dCreds := credentials.NewTLS(tlsConfig) - dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds)) - } else { - dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } // HTTP 1.1+JSON Server // grpc-ecosystem/grpc-gateway is used to proxy HTTP requests to the corresponding gRPC call @@ -790,17 +833,17 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl terminalHandler.ServeHTTP(writer, request) }) - mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) - mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts) + mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandler, ctx, gwmux, conn) + mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandler, ctx, gwmux, conn) // Swagger UI swagger.ServeSwaggerUI(mux, assets.SwaggerJSON, "/swagger-ui", a.RootPath) @@ -968,11 +1011,11 @@ func isMainJsBundle(url *url.URL) bool { return mainJsBundleRegex.Match([]byte(filename)) } -type registerFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error +type registerFunc func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error // mustRegisterGWHandler is a convenience function to register a gateway handler -func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) { - err := register(ctx, mux, endpoint, opts) +func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) { + err := register(ctx, mux, conn) if err != nil { panic(err) } diff --git a/server/server_norace_test.go b/server/server_norace_test.go index e5f88c51cab1..737186dd44c9 100644 --- a/server/server_norace_test.go +++ b/server/server_norace_test.go @@ -29,20 +29,16 @@ func TestUserAgent(t *testing.T) { s, closer := fakeServer() defer closer() + lns, err := s.Listen() + assert.NoError(t, err) + cancelInformer := test.StartInformer(s.projInformer) defer cancelInformer() - port, err := test.GetFreePort() - assert.NoError(t, err) - metricsPort, err := test.GetFreePort() - assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.Run(ctx, port, metricsPort) + go s.Run(ctx, lns) defer func() { time.Sleep(3 * time.Second) }() - err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second) - assert.NoError(t, err) - type testData struct { userAgent string errorMsg string @@ -72,7 +68,7 @@ func TestUserAgent(t *testing.T) { for _, test := range tests { opts := apiclient.ClientOptions{ - ServerAddr: fmt.Sprintf("localhost:%d", port), + ServerAddr: fmt.Sprintf("localhost:%d", s.ListenPort), PlainText: true, UserAgent: test.userAgent, } @@ -99,25 +95,20 @@ func Test_StaticHeaders(t *testing.T) { { s, closer := fakeServer() defer closer() + lns, err := s.Listen() + assert.NoError(t, err) cancelInformer := test.StartInformer(s.projInformer) defer cancelInformer() - port, err := test.GetFreePort() - assert.NoError(t, err) - metricsPort, err := test.GetFreePort() - assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.Run(ctx, port, metricsPort) + go s.Run(ctx, lns) defer func() { time.Sleep(3 * time.Second) }() - err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second) - assert.NoError(t, err) - // Allow server startup time.Sleep(1 * time.Second) client := http.Client{} - url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port) + url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort) req, err := http.NewRequest("GET", url, nil) assert.NoError(t, err) resp, err := client.Do(req) @@ -134,23 +125,18 @@ func Test_StaticHeaders(t *testing.T) { s.ContentSecurityPolicy = "frame-ancestors 'none';" cancelInformer := test.StartInformer(s.projInformer) defer cancelInformer() - port, err := test.GetFreePort() - assert.NoError(t, err) - metricsPort, err := test.GetFreePort() + lns, err := s.Listen() assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.Run(ctx, port, metricsPort) + go s.Run(ctx, lns) defer func() { time.Sleep(3 * time.Second) }() - err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second) - assert.NoError(t, err) - // Allow server startup time.Sleep(1 * time.Second) client := http.Client{} - url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port) + url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort) req, err := http.NewRequest("GET", url, nil) assert.NoError(t, err) resp, err := client.Do(req) @@ -167,23 +153,21 @@ func Test_StaticHeaders(t *testing.T) { s.ContentSecurityPolicy = "" cancelInformer := test.StartInformer(s.projInformer) defer cancelInformer() - port, err := test.GetFreePort() - assert.NoError(t, err) - metricsPort, err := test.GetFreePort() + lns, err := s.Listen() assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go s.Run(ctx, port, metricsPort) + go s.Run(ctx, lns) defer func() { time.Sleep(3 * time.Second) }() - err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second) + err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", s.ListenPort), 10*time.Second) assert.NoError(t, err) // Allow server startup time.Sleep(1 * time.Second) client := http.Client{} - url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port) + url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort) req, err := http.NewRequest("GET", url, nil) assert.NoError(t, err) resp, err := client.Do(req) diff --git a/server/server_test.go b/server/server_test.go index 063e0835bc47..e698ec4b7b44 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -41,8 +41,13 @@ func fakeServer() (*ArgoCDServer, func()) { kubeclientset := fake.NewSimpleClientset(cm, secret) appClientSet := apps.NewSimpleClientset() redis, closer := test.NewInMemoryRedis() + port, err := test.GetFreePort() + if err != nil { + panic(err) + } argoCDOpts := ArgoCDServerOpts{ + ListenPort: port, Namespace: test.FakeArgoCDNamespace, KubeClientset: kubeclientset, AppClientset: appClientSet,