diff --git a/core/service/serviceconf.go b/core/service/serviceconf.go index c3019de7c623..2bf9a45011e3 100644 --- a/core/service/serviceconf.go +++ b/core/service/serviceconf.go @@ -9,6 +9,7 @@ import ( "github.com/zeromicro/go-zero/core/prometheus" "github.com/zeromicro/go-zero/core/stat" "github.com/zeromicro/go-zero/core/trace" + "github.com/zeromicro/go-zero/internal/devserver" ) const ( @@ -28,10 +29,12 @@ const ( type ServiceConf struct { Name string Log logx.LogConf - Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` - MetricsUrl string `json:",optional"` + Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` + MetricsUrl string `json:",optional"` + // Deprecated: please use DevServer Prometheus prometheus.Config `json:",optional"` Telemetry trace.Config `json:",optional"` + DevServer devserver.Config `json:",optional"` } // MustSetUp sets up the service, exits on error. @@ -64,6 +67,7 @@ func (sc ServiceConf) SetUp() error { if len(sc.MetricsUrl) > 0 { stat.SetReportWriter(stat.NewRemoteWriter(sc.MetricsUrl)) } + devserver.StartAgent(sc.DevServer) return nil } diff --git a/go.mod b/go.mod index 07ade0f5bfae..02ca6b16bc54 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( ) require ( + github.com/felixge/fgprof v0.9.3 github.com/google/gofuzz v1.2.0 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect diff --git a/go.sum b/go.sum index c4fb0aeac9ce..c4d84b0ca0c8 100644 --- a/go.sum +++ b/go.sum @@ -453,6 +453,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= +github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= @@ -582,6 +584,8 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -623,6 +627,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= @@ -1096,6 +1101,7 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/devserver/config.go b/internal/devserver/config.go new file mode 100644 index 000000000000..6ed3a87bf85e --- /dev/null +++ b/internal/devserver/config.go @@ -0,0 +1,12 @@ +package devserver + +// Config is config for inner http server. +type Config struct { + Enabled bool `json:",default=true"` + Host string `json:",optional"` + Port int `json:",default=6470"` + MetricsPath string `json:",default=/metrics"` + HealthPath string `json:",default=/healthz"` + EnableMetrics bool `json:",default=true"` + EnablePprof bool `json:",default=true"` +} diff --git a/internal/devserver/server.go b/internal/devserver/server.go new file mode 100644 index 000000000000..d06658b2d291 --- /dev/null +++ b/internal/devserver/server.go @@ -0,0 +1,86 @@ +package devserver + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/pprof" + "sync" + + "github.com/felixge/fgprof" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/threading" + "github.com/zeromicro/go-zero/internal/health" +) + +var ( + once sync.Once +) + +// Server is inner http server, expose some useful observability information of app. +// For example health check, metrics and pprof. +type Server struct { + config *Config + server *http.ServeMux + routes []string +} + +// NewServer returns a new inner http Server. +func NewServer(config *Config) *Server { + return &Server{ + config: config, + server: http.NewServeMux(), + } +} + +func (s *Server) addRoutes() { + // route path, routes list + s.handleFunc("/", func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(s.routes) + }) + // health + s.handleFunc(s.config.HealthPath, health.CreateHttpHandler()) + + // metrics + if s.config.EnableMetrics { + s.handleFunc(s.config.MetricsPath, promhttp.Handler().ServeHTTP) + } + // pprof + if s.config.EnablePprof { + s.handleFunc("/debug/fgprof", fgprof.Handler().(http.HandlerFunc)) + s.handleFunc("/debug/pprof/", pprof.Index) + s.handleFunc("/debug/pprof/cmdline", pprof.Cmdline) + s.handleFunc("/debug/pprof/profile", pprof.Profile) + s.handleFunc("/debug/pprof/symbol", pprof.Symbol) + s.handleFunc("/debug/pprof/trace", pprof.Trace) + } +} + +func (s *Server) handleFunc(pattern string, handler http.HandlerFunc) { + s.server.HandleFunc(pattern, handler) + s.routes = append(s.routes, pattern) +} + +// StartAsync start inner http server background. +func (s *Server) StartAsync() { + s.addRoutes() + threading.GoSafe(func() { + addr := fmt.Sprintf("%s:%d", s.config.Host, s.config.Port) + logx.Infof("Starting inner http server at %s", addr) + if err := http.ListenAndServe(addr, s.server); err != nil { + logx.Error(err) + } + }) +} + +// StartAgent start inner http server by config. +func StartAgent(c Config) { + once.Do(func() { + if c.Enabled { + s := NewServer(&c) + s.StartAsync() + } + }) +} diff --git a/internal/health/health.go b/internal/health/health.go new file mode 100644 index 000000000000..8cb41b0e3368 --- /dev/null +++ b/internal/health/health.go @@ -0,0 +1,140 @@ +package health + +import ( + "net/http" + "sync" + + "github.com/zeromicro/go-zero/core/syncx" +) + +// defaultHealthManager is global comboHealthManager for byone self. +var defaultHealthManager = newComboHealthManager() + +type ( + // Probe represents readiness status of given component. + Probe interface { + // MarkReady sets a ready state for the endpoint handlers. + MarkReady() + // MarkNotReady sets a not ready state for the endpoint handlers. + MarkNotReady() + // IsReady return inner state for the component. + IsReady() bool + // Name return probe name identifier + Name() string + } + + // healthManager manage app healthy. + healthManager struct { + ready syncx.AtomicBool + name string + } + + // comboHealthManager folds given probes into one, reflects their statuses in a thread-safe way. + comboHealthManager struct { + mu sync.Mutex + probes []Probe + } +) + +// AddProbe add components probe to global comboHealthManager. +func AddProbe(probe Probe) { + defaultHealthManager.addProbe(probe) +} + +// NewHealthManager returns a new healthManager. +func NewHealthManager(name string) Probe { + return &healthManager{ + name: name, + } +} + +// MarkReady sets a ready state for the endpoint handlers. +func (h *healthManager) MarkReady() { + h.ready.Set(true) +} + +// MarkNotReady sets a not ready state for the endpoint handlers. +func (h *healthManager) MarkNotReady() { + h.ready.Set(false) +} + +// IsReady return inner state for the component. +func (h *healthManager) IsReady() bool { + return h.ready.True() +} + +// Name return probe name identifier +func (h *healthManager) Name() string { + return h.name +} + +func newComboHealthManager() *comboHealthManager { + return &comboHealthManager{} +} + +// MarkReady sets components status to ready. +func (p *comboHealthManager) MarkReady() { + p.mu.Lock() + defer p.mu.Unlock() + + for _, probe := range p.probes { + probe.MarkReady() + } +} + +// MarkNotReady sets components status to not ready with given error as a cause. +func (p *comboHealthManager) MarkNotReady() { + p.mu.Lock() + defer p.mu.Unlock() + + for _, probe := range p.probes { + probe.MarkNotReady() + } +} + +// IsReady return composed status of all components. +func (p *comboHealthManager) IsReady() bool { + p.mu.Lock() + defer p.mu.Unlock() + + for _, probe := range p.probes { + if !probe.IsReady() { + return false + } + } + return true +} + +func (p *comboHealthManager) verboseInfo() string { + p.mu.Lock() + defer p.mu.Unlock() + + var info string + for _, probe := range p.probes { + if probe.IsReady() { + info += probe.Name() + " is ready; \n" + } else { + info += probe.Name() + " is not ready; \n" + } + } + return info +} + +// addProbe add components probe to comboHealthManager. +func (p *comboHealthManager) addProbe(probe Probe) { + p.mu.Lock() + defer p.mu.Unlock() + + p.probes = append(p.probes, probe) +} + +// CreateHttpHandler create health http handler base on given probe. +func CreateHttpHandler() http.HandlerFunc { + return func(w http.ResponseWriter, request *http.Request) { + if defaultHealthManager.IsReady() { + _, _ = w.Write([]byte("OK")) + } else { + http.Error(w, "Service Unavailable\n"+defaultHealthManager.verboseInfo(), http.StatusServiceUnavailable) + } + } +} diff --git a/internal/health/health_test.go b/internal/health/health_test.go new file mode 100644 index 000000000000..4a6fc0ff9e14 --- /dev/null +++ b/internal/health/health_test.go @@ -0,0 +1,140 @@ +package health + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +const probeName = "probe" + +func TestHealthManager(t *testing.T) { + hm := NewHealthManager(probeName) + assert.False(t, hm.IsReady()) + + hm.MarkReady() + assert.True(t, hm.IsReady()) + + hm.MarkNotReady() + assert.False(t, hm.IsReady()) + + t.Run("concurrent should works", func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + hm.MarkReady() + wg.Done() + }() + } + wg.Wait() + assert.True(t, hm.IsReady()) + }) +} + +func TestComboHealthManager(t *testing.T) { + t.Run("base", func(t *testing.T) { + chm := newComboHealthManager() + hm1 := NewHealthManager(probeName) + hm2 := NewHealthManager(probeName + "2") + + assert.True(t, chm.IsReady()) + chm.addProbe(hm1) + chm.addProbe(hm2) + assert.False(t, chm.IsReady()) + hm1.MarkReady() + assert.False(t, chm.IsReady()) + hm2.MarkReady() + assert.True(t, chm.IsReady()) + }) + + t.Run("concurrent add probes", func(t *testing.T) { + chm2 := newComboHealthManager() + + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + hm := NewHealthManager(probeName) + hm.MarkReady() + chm2.addProbe(hm) + wg.Done() + }() + } + wg.Wait() + assert.True(t, chm2.IsReady()) + }) + + t.Run("markReady and markNotReady", func(t *testing.T) { + chm2 := newComboHealthManager() + + for i := 0; i < 10; i++ { + hm := NewHealthManager(probeName) + chm2.addProbe(hm) + } + assert.False(t, chm2.IsReady()) + + chm2.MarkReady() + assert.True(t, chm2.IsReady()) + + chm2.MarkNotReady() + assert.False(t, chm2.IsReady()) + }) +} + +func TestAddGlobalProbes(t *testing.T) { + cleanupForTest(t) + + t.Run("concurrent add probes", func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + hm := NewHealthManager(probeName) + hm.MarkReady() + AddProbe(hm) + wg.Done() + }() + } + wg.Wait() + assert.True(t, defaultHealthManager.IsReady()) + }) +} + +func TestCreateHttpHandler(t *testing.T) { + cleanupForTest(t) + srv := httptest.NewServer(CreateHttpHandler()) + defer srv.Close() + + resp, err := http.Get(srv.URL) + assert.Nil(t, err) + _ = resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + hm := NewHealthManager(probeName) + defaultHealthManager.addProbe(hm) + + resp, err = http.Get(srv.URL) + assert.Nil(t, err) + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + content, _ := io.ReadAll(resp.Body) + assert.True(t, strings.HasPrefix(string(content), "Service Unavailable")) + _ = resp.Body.Close() + + hm.MarkReady() + resp, err = http.Get(srv.URL) + assert.Nil(t, err) + _ = resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func cleanupForTest(t *testing.T) { + t.Cleanup(func() { + defaultHealthManager = &comboHealthManager{} + }) +} diff --git a/rest/internal/starter.go b/rest/internal/starter.go index 56198380c7a9..6ac180d4d566 100644 --- a/rest/internal/starter.go +++ b/rest/internal/starter.go @@ -7,8 +7,11 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/internal/health" ) +const probeNamePrefix = "rest" + // StartOption defines the method to customize http.Server. type StartOption func(svr *http.Server) @@ -37,8 +40,10 @@ func start(host string, port int, handler http.Handler, run func(svr *http.Serve for _, opt := range opts { opt(server) } + healthManager := health.NewHealthManager(fmt.Sprintf("%s-%s:%d", probeNamePrefix, host, port)) waitForCalled := proc.AddWrapUpListener(func() { + healthManager.MarkNotReady() if e := server.Shutdown(context.Background()); e != nil { logx.Error(e) } @@ -49,5 +54,7 @@ func start(host string, port int, handler http.Handler, run func(svr *http.Serve } }() + healthManager.MarkReady() + health.AddProbe(healthManager) return run(server) } diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index cf9c8ce82795..3d06702325f2 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -1,15 +1,20 @@ package internal import ( + "fmt" "net" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + "github.com/zeromicro/go-zero/core/proc" "github.com/zeromicro/go-zero/core/stat" + "github.com/zeromicro/go-zero/internal/health" "github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" ) +const probeNamePrefix = "zrpc" + type ( // ServerOption defines the method to customize a rpcServerOptions. ServerOption func(options *rpcServerOptions) @@ -22,6 +27,7 @@ type ( rpcServer struct { name string *baseRpcServer + healthManager health.Probe } ) @@ -37,6 +43,7 @@ func NewRpcServer(address string, opts ...ServerOption) Server { return &rpcServer{ baseRpcServer: newBaseRpcServer(address, &options), + healthManager: health.NewHealthManager(fmt.Sprintf("%s-%s", probeNamePrefix, address)), } } @@ -75,6 +82,8 @@ func (s *rpcServer) Start(register RegisterFn) error { grpc_health_v1.RegisterHealthServer(server, s.health) s.health.Resume() } + s.healthManager.MarkReady() + health.AddProbe(s.healthManager) // we need to make sure all others are wrapped up, // so we do graceful stop at shutdown phase instead of wrap up phase