Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid closing and re-opening port of api server settings change #9778

Merged
merged 1 commit into from Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/argocd-server/commands/argocd_server.go
Expand Up @@ -152,7 +152,9 @@ func NewCommand() *cobra.Command {
stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")

argocd := server.NewServer(context.Background(), argoCDOpts)
lns, err := argocd.Listen()
errors.CheckError(err)
for {
var closer func()
ctx := context.Background()
Expand All @@ -163,8 +165,7 @@ func NewCommand() *cobra.Command {
log.Fatalf("failed to initialize tracing: %v", err)
}
}
argocd := server.NewServer(ctx, argoCDOpts)
argocd.Run(ctx, listenPort, metricsPort)
argocd.Run(ctx, lns)
cancel()
if closer != nil {
closer()
Expand Down
6 changes: 5 additions & 1 deletion cmd/argocd/commands/headless/headless.go
Expand Up @@ -216,7 +216,11 @@ func StartLocalServer(clientOpts *apiclient.ClientOptions, ctxStr string, port *
RepoClientset: &forwardRepoClientset{namespace: namespace, context: ctxStr},
})

go srv.Run(ctx, *port, 0)
lns, err := srv.Listen()
if err != nil {
return err
}
go srv.Run(ctx, lns)
clientOpts.ServerAddr = fmt.Sprintf("%s:%d", *address, *port)
clientOpts.PlainText = true
if !cache2.WaitForCacheSync(ctx.Done(), srv.Initialized) {
Expand Down
163 changes: 103 additions & 60 deletions server/server.go
Expand Up @@ -282,22 +282,102 @@ func (a *ArgoCDServer) healthCheck(r *http.Request) error {
return nil
}

type Listeners struct {
Main net.Listener
Metrics net.Listener
GatewayConn *grpc.ClientConn
}

func (l *Listeners) Close() error {
if l.Main != nil {
if err := l.Main.Close(); err != nil {
return err
}
l.Main = nil
}
if l.Metrics != nil {
if err := l.Metrics.Close(); err != nil {
return err
}
l.Metrics = nil
}
if l.GatewayConn != nil {
if err := l.GatewayConn.Close(); err != nil {
return err
}
l.GatewayConn = nil
}
return nil
}

func startListener(host string, port int) (net.Listener, error) {
var conn net.Listener
var realErr error
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if realErr != nil {
return false, nil
}
return true, nil
})
return conn, realErr
}

func (a *ArgoCDServer) Listen() (*Listeners, error) {
mainLn, err := startListener(a.ListenHost, a.ListenPort)
if err != nil {
return nil, err
}
metricsLn, err := startListener(a.ListenHost, a.MetricsPort)
if err != nil {
io.Close(mainLn)
return nil, err
}
var dOpts []grpc.DialOption
dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize)))
dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version)))
dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()))
dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if a.useTLS() {
// The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS.
// grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost,
// so we need to supply the same certificates to establish the connections that a normal,
// external gRPC client would need.
tlsConfig := a.settings.TLSConfig()
if a.TLSConfigCustomizer != nil {
a.TLSConfigCustomizer(tlsConfig)
}
tlsConfig.InsecureSkipVerify = true
dCreds := credentials.NewTLS(tlsConfig)
dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds))
} else {
dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", a.ListenPort), dOpts...)
if err != nil {
io.Close(mainLn)
io.Close(metricsLn)
return nil, err
}
return &Listeners{Main: mainLn, Metrics: metricsLn, GatewayConn: conn}, nil
}

// Run runs the API Server
// We use k8s.io/code-generator/cmd/go-to-protobuf to generate the .proto files from the API types.
// k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of
// golang/protobuf).
func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
a.userStateStorage.Init(ctx)

grpcS, appResourceTreeFn := a.newGRPCServer()
grpcWebS := grpcweb.WrapServer(grpcS)
var httpS *http.Server
var httpsS *http.Server
if a.useTLS() {
httpS = newRedirectServer(port, a.RootPath)
httpsS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn)
httpS = newRedirectServer(a.ListenPort, a.RootPath)
httpsS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn)
} else {
httpS = a.newHTTPServer(ctx, port, grpcWebS, appResourceTreeFn)
httpS = a.newHTTPServer(ctx, a.ListenPort, grpcWebS, appResourceTreeFn, listeners.GatewayConn)
}
if a.RootPath != "" {
httpS.Handler = withRootPath(httpS.Handler, a)
Expand All @@ -311,26 +391,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
httpsS.Handler = &bug21955Workaround{handler: httpsS.Handler}
}

metricsServ := metrics.NewMetricsServer(a.ListenHost, metricsPort)
metricsServ := metrics.NewMetricsServer(a.ListenHost, a.MetricsPort)
if a.RedisClient != nil {
cacheutil.CollectMetrics(a.RedisClient, metricsServ)
}

// Start listener
var conn net.Listener
var realErr error
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", a.ListenHost, port))
if realErr != nil {
a.log.Warnf("failed listen: %v", realErr)
return false, nil
}
return true, nil
})
errors.CheckError(realErr)

// CMux is used to support servicing gRPC and HTTP1.1+JSON on the same port
tcpm := cmux.New(conn)
tcpm := cmux.New(listeners.Main)
var tlsm cmux.CMux
var grpcL net.Listener
var httpL net.Listener
Expand Down Expand Up @@ -360,7 +427,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {

// Start the muxed listeners for our servers
log.Infof("argocd %s serving on port %d (url: %s, tls: %v, namespace: %s, sso: %v)",
common.GetVersion(), port, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured())
common.GetVersion(), a.ListenPort, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured())

go a.projInformer.Run(ctx.Done())
go a.appInformer.Run(ctx.Done())
Expand All @@ -374,17 +441,13 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) {
go a.watchSettings()
go a.rbacPolicyLoader(ctx)
go func() { a.checkServeErr("tcpm", tcpm.Serve()) }()
go func() { a.checkServeErr("metrics", metricsServ.ListenAndServe()) }()
go func() { a.checkServeErr("metrics", metricsServ.Serve(listeners.Metrics)) }()
if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) {
log.Fatal("Timed out waiting for project cache to sync")
}

a.stopCh = make(chan struct{})
<-a.stopCh
errors.CheckError(conn.Close())
if err := metricsServ.Shutdown(ctx); err != nil {
log.Fatalf("Failed to gracefully shutdown metrics server: %v", err)
}
}

func (a *ArgoCDServer) Initialized() bool {
Expand Down Expand Up @@ -702,7 +765,7 @@ func compressHandler(handler http.Handler) http.Handler {

// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented
// using grpc-gateway as a proxy to the gRPC server.
func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn) *http.Server {
func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandler http.Handler, appResourceTreeFn application.AppResourceTreeFn, conn *grpc.ClientConn) *http.Server {
endpoint := fmt.Sprintf("localhost:%d", port)
mux := http.NewServeMux()
httpS := http.Server{
Expand All @@ -718,26 +781,6 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl
},
},
}
var dOpts []grpc.DialOption
dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize)))
dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version)))
dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()))
dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if a.useTLS() {
// The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS.
// grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost,
// so we need to supply the same certificates to establish the connections that a normal,
// external gRPC client would need.
tlsConfig := a.settings.TLSConfig()
if a.TLSConfigCustomizer != nil {
a.TLSConfigCustomizer(tlsConfig)
}
tlsConfig.InsecureSkipVerify = true
dCreds := credentials.NewTLS(tlsConfig)
dOpts = append(dOpts, grpc.WithTransportCredentials(dCreds))
} else {
dOpts = append(dOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

// HTTP 1.1+JSON Server
// grpc-ecosystem/grpc-gateway is used to proxy HTTP requests to the corresponding gRPC call
Expand Down Expand Up @@ -790,17 +833,17 @@ func (a *ArgoCDServer) newHTTPServer(ctx context.Context, port int, grpcWebHandl
terminalHandler.ServeHTTP(writer, request)
})

mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dOpts)
mustRegisterGWHandler(versionpkg.RegisterVersionServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(clusterpkg.RegisterClusterServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(applicationpkg.RegisterApplicationServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(repositorypkg.RegisterRepositoryServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(repocredspkg.RegisterRepoCredsServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(sessionpkg.RegisterSessionServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(settingspkg.RegisterSettingsServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(projectpkg.RegisterProjectServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(accountpkg.RegisterAccountServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(certificatepkg.RegisterCertificateServiceHandler, ctx, gwmux, conn)
mustRegisterGWHandler(gpgkeypkg.RegisterGPGKeyServiceHandler, ctx, gwmux, conn)

// Swagger UI
swagger.ServeSwaggerUI(mux, assets.SwaggerJSON, "/swagger-ui", a.RootPath)
Expand Down Expand Up @@ -968,11 +1011,11 @@ func isMainJsBundle(url *url.URL) bool {
return mainJsBundleRegex.Match([]byte(filename))
}

type registerFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
type registerFunc func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error

// mustRegisterGWHandler is a convenience function to register a gateway handler
func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) {
err := register(ctx, mux, endpoint, opts)
func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) {
err := register(ctx, mux, conn)
if err != nil {
panic(err)
}
Expand Down
48 changes: 16 additions & 32 deletions server/server_norace_test.go
Expand Up @@ -29,20 +29,16 @@ func TestUserAgent(t *testing.T) {

s, closer := fakeServer()
defer closer()
lns, err := s.Listen()
assert.NoError(t, err)

cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

type testData struct {
userAgent string
errorMsg string
Expand Down Expand Up @@ -72,7 +68,7 @@ func TestUserAgent(t *testing.T) {

for _, test := range tests {
opts := apiclient.ClientOptions{
ServerAddr: fmt.Sprintf("localhost:%d", port),
ServerAddr: fmt.Sprintf("localhost:%d", s.ListenPort),
PlainText: true,
UserAgent: test.userAgent,
}
Expand All @@ -99,25 +95,20 @@ func Test_StaticHeaders(t *testing.T) {
{
s, closer := fakeServer()
defer closer()
lns, err := s.Listen()
assert.NoError(t, err)
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand All @@ -134,23 +125,18 @@ func Test_StaticHeaders(t *testing.T) {
s.ContentSecurityPolicy = "frame-ancestors 'none';"
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
lns, err := s.Listen()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand All @@ -167,23 +153,21 @@ func Test_StaticHeaders(t *testing.T) {
s.ContentSecurityPolicy = ""
cancelInformer := test.StartInformer(s.projInformer)
defer cancelInformer()
port, err := test.GetFreePort()
assert.NoError(t, err)
metricsPort, err := test.GetFreePort()
lns, err := s.Listen()
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx, port, metricsPort)
go s.Run(ctx, lns)
defer func() { time.Sleep(3 * time.Second) }()

err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", port), 10*time.Second)
err = test.WaitForPortListen(fmt.Sprintf("127.0.0.1:%d", s.ListenPort), 10*time.Second)
assert.NoError(t, err)

// Allow server startup
time.Sleep(1 * time.Second)

client := http.Client{}
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", port)
url := fmt.Sprintf("http://127.0.0.1:%d/test.html", s.ListenPort)
req, err := http.NewRequest("GET", url, nil)
assert.NoError(t, err)
resp, err := client.Do(req)
Expand Down