From 7c2b4087b11d8ba83f1597180d7c4f60fb9e6a99 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Wed, 24 Nov 2021 11:43:32 -0700 Subject: [PATCH 01/14] reverseproxy: Begin refactor to enable dynamic upstreams Streamed here: https://www.youtube.com/watch?v=hj7yzXb11jU --- modules/caddyhttp/reverseproxy/admin.go | 2 +- .../caddyhttp/reverseproxy/healthchecks.go | 4 +- modules/caddyhttp/reverseproxy/hosts.go | 64 +++++-------------- .../caddyhttp/reverseproxy/reverseproxy.go | 39 +++++++++-- .../reverseproxy/selectionpolicies_test.go | 32 +++++----- 5 files changed, 69 insertions(+), 72 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/admin.go b/modules/caddyhttp/reverseproxy/admin.go index 25685a3a302..586b799f14e 100644 --- a/modules/caddyhttp/reverseproxy/admin.go +++ b/modules/caddyhttp/reverseproxy/admin.go @@ -87,7 +87,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er return false } - upstream, ok := val.(*upstreamHost) + upstream, ok := val.(*Host) if !ok { rangeErr = caddy.APIError{ HTTPStatus: http.StatusInternalServerError, diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 230bf3aaf94..7409e3ed417 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -213,7 +213,7 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { // according to whether it passes the health check. An error is // returned only if the health check fails to occur or if marking // the host's health status fails. -func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host Host) error { +func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host *Host) error { // create the URL for the request that acts as a health check scheme := "http" if ht, ok := h.Transport.(TLSTransport); ok && ht.TLSEnabled() { @@ -375,7 +375,7 @@ func (h *Handler) countFailure(upstream *Upstream) { } // forget it later - go func(host Host, failDuration time.Duration) { + go func(host *Host, failDuration time.Duration) { defer func() { if err := recover(); err != nil { log.Printf("[PANIC] health check failure forgetter: %v\n%s", err, debug.Stack()) diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index b9817d2370d..29f89359312 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -26,44 +26,13 @@ import ( "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) -// Host represents a remote host which can be proxied to. -// Its methods must be safe for concurrent use. -type Host interface { - // NumRequests returns the number of requests - // currently in process with the host. - NumRequests() int - - // Fails returns the count of recent failures. - Fails() int - - // Unhealthy returns true if the backend is unhealthy. - Unhealthy() bool - - // CountRequest atomically counts the given number of - // requests as currently in process with the host. The - // count should not go below 0. - CountRequest(int) error - - // CountFail atomically counts the given number of - // failures with the host. The count should not go - // below 0. - CountFail(int) error - - // SetHealthy atomically marks the host as either - // healthy (true) or unhealthy (false). If the given - // status is the same, this should be a no-op and - // return false. It returns true if the status was - // changed; i.e. if it is now different from before. - SetHealthy(bool) (bool, error) -} - // UpstreamPool is a collection of upstreams. type UpstreamPool []*Upstream // Upstream bridges this proxy's configuration to the // state of the backend host it is correlated with. type Upstream struct { - Host `json:"-"` + *Host `json:"-"` // The [network address](/docs/conventions#network-addresses) // to dial to connect to the upstream. Must represent precisely @@ -174,34 +143,33 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { }, nil } -// upstreamHost is the basic, in-memory representation -// of the state of a remote host. It implements the -// Host interface. -type upstreamHost struct { +// Host is the basic, in-memory representation +// of the state of a remote host. +type Host struct { numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) fails int64 unhealthy int32 } // NumRequests returns the number of active requests to the upstream. -func (uh *upstreamHost) NumRequests() int { - return int(atomic.LoadInt64(&uh.numRequests)) +func (h *Host) NumRequests() int { + return int(atomic.LoadInt64(&h.numRequests)) } // Fails returns the number of recent failures with the upstream. -func (uh *upstreamHost) Fails() int { - return int(atomic.LoadInt64(&uh.fails)) +func (h *Host) Fails() int { + return int(atomic.LoadInt64(&h.fails)) } // Unhealthy returns whether the upstream is healthy. -func (uh *upstreamHost) Unhealthy() bool { - return atomic.LoadInt32(&uh.unhealthy) == 1 +func (h *Host) Unhealthy() bool { + return atomic.LoadInt32(&h.unhealthy) == 1 } // CountRequest mutates the active request count by // delta. It returns an error if the adjustment fails. -func (uh *upstreamHost) CountRequest(delta int) error { - result := atomic.AddInt64(&uh.numRequests, int64(delta)) +func (h *Host) CountRequest(delta int) error { + result := atomic.AddInt64(&h.numRequests, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) } @@ -210,8 +178,8 @@ func (uh *upstreamHost) CountRequest(delta int) error { // CountFail mutates the recent failures count by // delta. It returns an error if the adjustment fails. -func (uh *upstreamHost) CountFail(delta int) error { - result := atomic.AddInt64(&uh.fails, int64(delta)) +func (h *Host) CountFail(delta int) error { + result := atomic.AddInt64(&h.fails, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) } @@ -220,12 +188,12 @@ func (uh *upstreamHost) CountFail(delta int) error { // SetHealthy sets the upstream has healthy or unhealthy // and returns true if the new value is different. -func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) { +func (h *Host) SetHealthy(healthy bool) (bool, error) { var unhealthy, compare int32 = 1, 0 if healthy { unhealthy, compare = 0, 1 } - swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy) + swapped := atomic.CompareAndSwapInt32(&h.unhealthy, compare, unhealthy) return swapped, nil } diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index eaa7cbf928e..82cd3ad1029 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -81,6 +81,9 @@ type Handler struct { // Upstreams is the list of backends to proxy to. Upstreams UpstreamPool `json:"upstreams,omitempty"` + // TODO: godoc + DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + // Adjusts how often to flush the response buffer. By default, // no periodic flushing is done. A negative value disables // response buffering, and flushes immediately after each @@ -130,8 +133,9 @@ type Handler struct { // - `{http.reverse_proxy.header.*}` The headers from the response HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"` - Transport http.RoundTripper `json:"-"` - CB CircuitBreaker `json:"-"` + Transport http.RoundTripper `json:"-"` + CB CircuitBreaker `json:"-"` + DynamicUpstreams UpstreamSource `json:"-"` // Holds the named response matchers from the Caddyfile while adapting responseMatchers map[string]caddyhttp.ResponseMatcher @@ -191,6 +195,13 @@ func (h *Handler) Provision(ctx caddy.Context) error { } h.CB = mod.(CircuitBreaker) } + if h.DynamicUpstreamsRaw != nil { + mod, err := ctx.LoadModule(h, "DynamicUpstreamsRaw") + if err != nil { + return fmt.Errorf("loading upstream source module: %v", err) + } + h.DynamicUpstreams = mod.(UpstreamSource) + } // ensure any embedded headers handler module gets provisioned // (see https://caddy.community/t/set-cookie-manipulation-in-reverse-proxy/7666?u=matt @@ -238,10 +249,10 @@ func (h *Handler) Provision(ctx caddy.Context) error { // set up upstreams for _, upstream := range h.Upstreams { // create or get the host representation for this upstream - var host Host = new(upstreamHost) + host := new(Host) existingHost, loaded := hosts.LoadOrStore(upstream.String(), host) if loaded { - host = existingHost.(Host) + host = existingHost.(*Host) } upstream.Host = host @@ -413,10 +424,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht repl.Set("http.reverse_proxy.duration", time.Since(start)) }() + // get the list of upstreams + upstreams := h.Upstreams + if h.DynamicUpstreams != nil { + dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) + if err != nil { + h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) + } else { + upstreams = dUpstreams + } + } + var proxyErr error for { // choose an available upstream - upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, r, w) + upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) if upstream == nil { if proxyErr == nil { proxyErr = fmt.Errorf("no upstreams available") @@ -929,6 +951,13 @@ type Selector interface { Select(UpstreamPool, *http.Request, http.ResponseWriter) *Upstream } +// UpstreamSource gets the list of upstreams that can be used when +// proxying a request. Returned upstreams will be load balanced and +// health-checked. +type UpstreamSource interface { + GetUpstreams(*http.Request) ([]*Upstream, error) +} + // Hop-by-hop headers. These are removed when sent to the backend. // As of RFC 7230, hop-by-hop headers are required to appear in the // Connection header field. These are the headers defined by the diff --git a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go index c28799d4267..9cb205307dd 100644 --- a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go +++ b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go @@ -22,9 +22,9 @@ import ( func testPool() UpstreamPool { return UpstreamPool{ - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, } } @@ -167,8 +167,8 @@ func TestIPHashPolicy(t *testing.T) { // We should be able to resize the host pool and still be able to predict // where a req will be routed with the same IP's used above pool = UpstreamPool{ - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, + {Host: new(Host)}, + {Host: new(Host)}, } req.RemoteAddr = "172.0.0.1:80" h = ipHash.Select(pool, req, nil) @@ -201,15 +201,15 @@ func TestIPHashPolicy(t *testing.T) { // Reproduce #4135 pool = UpstreamPool{ - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, + {Host: new(Host)}, } pool[0].SetHealthy(false) pool[1].SetHealthy(false) @@ -271,8 +271,8 @@ func TestURIHashPolicy(t *testing.T) { // We should be able to resize the host pool and still be able to predict // where a request will be routed with the same URI's used above pool = UpstreamPool{ - {Host: new(upstreamHost)}, - {Host: new(upstreamHost)}, + {Host: new(Host)}, + {Host: new(Host)}, } request = httptest.NewRequest(http.MethodGet, "/test", nil) From a30b14eac15782da426b875c593e881a5f997ff0 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Wed, 8 Dec 2021 16:19:15 -0700 Subject: [PATCH 02/14] Implement SRV and A/AAA upstream sources Also get upstreams at every retry loop iteration instead of just once before the loop. See #4442. --- modules/caddyhttp/reverseproxy/hosts.go | 9 + .../caddyhttp/reverseproxy/reverseproxy.go | 38 +-- modules/caddyhttp/reverseproxy/upstreams.go | 243 ++++++++++++++++++ 3 files changed, 272 insertions(+), 18 deletions(-) create mode 100644 modules/caddyhttp/reverseproxy/upstreams.go diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 29f89359312..07bc7b8f823 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -143,6 +143,15 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { }, nil } +func (u *Upstream) setHost() { + host := new(Host) + existingHost, loaded := hosts.LoadOrStore(u.String(), host) + if loaded { + host = existingHost.(*Host) + } + u.Host = host +} + // Host is the basic, in-memory representation // of the state of a remote host. type Host struct { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 82cd3ad1029..7d7891493d9 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -249,12 +249,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // set up upstreams for _, upstream := range h.Upstreams { // create or get the host representation for this upstream - host := new(Host) - existingHost, loaded := hosts.LoadOrStore(upstream.String(), host) - if loaded { - host = existingHost.(*Host) - } - upstream.Host = host + upstream.setHost() // give it the circuit breaker, if any upstream.cb = h.CB @@ -424,19 +419,19 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht repl.Set("http.reverse_proxy.duration", time.Since(start)) }() - // get the list of upstreams - upstreams := h.Upstreams - if h.DynamicUpstreams != nil { - dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) - if err != nil { - h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) - } else { - upstreams = dUpstreams - } - } - var proxyErr error for { + // get the updated list of upstreams + upstreams := h.Upstreams + if h.DynamicUpstreams != nil { + dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) + if err != nil { + h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) + } else { + upstreams = dUpstreams + } + } + // choose an available upstream upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) if upstream == nil { @@ -953,7 +948,14 @@ type Selector interface { // UpstreamSource gets the list of upstreams that can be used when // proxying a request. Returned upstreams will be load balanced and -// health-checked. +// health-checked. This should be a very fast function -- instant +// if possible -- and the return value must be as stable as possible. +// In other words, the list of upstreams should ideally not change much +// across successive calls. If the list of upstreams changes or the +// ordering is not stable, load balancing will suffer. This function +// may be called during each retry, multiple times per request, and as +// such, needs to be instantaneous. The returned slice will not be +// modified. type UpstreamSource interface { GetUpstreams(*http.Request) ([]*Upstream, error) } diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go new file mode 100644 index 00000000000..37bda336628 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -0,0 +1,243 @@ +package reverseproxy + +import ( + "fmt" + "net" + "net/http" + "strconv" + "sync" + "time" + + "github.com/caddyserver/caddy/v2" +) + +func init() { + caddy.RegisterModule(SRVUpstreams{}) + caddy.RegisterModule(AUpstreams{}) +} + +// SRVUpstreams provides upstreams from SRV lookups. +// The lookup DNS name can be configured either by +// its individual parts (that is, specifying the +// service, protocol, and name separately) to form +// the standard "_service._proto.name" domain, or +// the domain can be specified directly in name by +// leaving service and proto empty. See RFC 2782. +// +// Lookups are cached and refreshed at the configured +// refresh interval. +// +// Returned upstreams are sorted by priority and weight. +type SRVUpstreams struct { + // The interval at which to refresh the SRV lookup. + // Results are cached between lookups. Default: 1m + Refresh time.Duration `json:"refresh,omitempty"` + + // The service label. + Service string `json:"service,omitempty"` + + // The protocol label; either tcp or udp. + Proto string `json:"proto,omitempty"` + + // The name label; or, if service and proto are + // empty, the entire domain name to look up. + Name string `json:"name,omitempty"` +} + +// CaddyModule returns the Caddy module information. +func (SRVUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.srv", + New: func() caddy.Module { return new(SRVUpstreams) }, + } +} + +// String returns the RFC 2782 representation of the SRV domain. +func (su SRVUpstreams) String() string { + return fmt.Sprintf("_%s._%s.%s", su.Service, su.Proto, su.Name) +} + +func (su *SRVUpstreams) Provision(_ caddy.Context) error { + if su.Proto != "tcp" && su.Proto != "udp" { + return fmt.Errorf("invalid proto '%s'", su.Proto) + } + if su.Refresh == 0 { + su.Refresh = time.Minute + } + return nil +} + +func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + suStr := su.String() + + // first, use a cheap read-lock to return a cached result quickly + srvsMu.RLock() + cached := srvs[suStr] + srvsMu.RUnlock() + if cached.fresh() { + return cached.upstreams, nil + } + + // otherwise, obtain a write-lock to update the cached value + srvsMu.Lock() + defer srvsMu.Unlock() + + // check to see if it's still stale, since we're now in a different + // lock from when we first checked freshness; another goroutine might + // have refreshed it in the meantime before we re-obtained our lock + cached = srvs[suStr] + if cached.fresh() { + return cached.upstreams, nil + } + + // prepare parameters and perform the SRV lookup + repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer) + service := repl.ReplaceAll(su.Service, "") + proto := repl.ReplaceAll(su.Proto, "") + name := repl.ReplaceAll(su.Name, "") + + _, records, err := net.DefaultResolver.LookupSRV(r.Context(), service, proto, name) + if err != nil { + return nil, err + } + + upstreams := make([]*Upstream, len(records)) + for i, rec := range records { + upstreams[i] = &Upstream{ + Dial: net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port))), + } + upstreams[i].setHost() + } + + // TODO: expire these somehow + srvs[suStr] = srvLookup{ + srvUpstreams: su, + freshness: time.Now(), + upstreams: upstreams, + } + + return upstreams, nil +} + +type srvLookup struct { + srvUpstreams SRVUpstreams + freshness time.Time + upstreams []*Upstream +} + +func (sl srvLookup) fresh() bool { + return time.Since(sl.freshness) < sl.srvUpstreams.Refresh +} + +var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex +) + +// AUpstreams provides upstreams from A/AAAA lookups. +// Results are cached and refreshed at the configured +// refresh interval. +type AUpstreams struct { + // The domain name to look up. + Name string `json:"name,omitempty"` + + // The port to use with the upstreams. Default: 80 + Port string `json:"port,omitempty"` + + // The interval at which to refresh the SRV lookup. + // Results are cached between lookups. Default: 1m + Refresh time.Duration `json:"refresh,omitempty"` +} + +// CaddyModule returns the Caddy module information. +func (AUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.a", + New: func() caddy.Module { return new(AUpstreams) }, + } +} + +func (au AUpstreams) String() string { return au.Name } + +func (au *AUpstreams) Provision(_ caddy.Context) error { + if au.Refresh == 0 { + au.Refresh = time.Minute + } + if au.Port == "" { + au.Port = "80" + } + return nil +} + +func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + auStr := au.String() + + // first, use a cheap read-lock to return a cached result quickly + aAaaaMu.RLock() + cached := aAaaa[auStr] + aAaaaMu.RUnlock() + if cached.fresh() { + return cached.upstreams, nil + } + + // otherwise, obtain a write-lock to update the cached value + aAaaaMu.Lock() + defer aAaaaMu.Unlock() + + // check to see if it's still stale, since we're now in a different + // lock from when we first checked freshness; another goroutine might + // have refreshed it in the meantime before we re-obtained our lock + cached = aAaaa[auStr] + if cached.fresh() { + return cached.upstreams, nil + } + + repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer) + name := repl.ReplaceAll(au.Name, "") + port := repl.ReplaceAll(au.Port, "") + + ips, err := net.DefaultResolver.LookupIPAddr(r.Context(), name) + if err != nil { + return nil, err + } + + upstreams := make([]*Upstream, len(ips)) + for i, ip := range ips { + upstreams[i] = &Upstream{ + Dial: net.JoinHostPort(ip.String(), port), + } + upstreams[i].setHost() + } + + // TODO: expire these somehow + aAaaa[auStr] = aLookup{ + aUpstreams: au, + freshness: time.Now(), + upstreams: upstreams, + } + + return upstreams, nil +} + +type aLookup struct { + aUpstreams AUpstreams + freshness time.Time + upstreams []*Upstream +} + +func (al aLookup) fresh() bool { + return time.Since(al.freshness) < al.aUpstreams.Refresh +} + +var ( + aAaaa = make(map[string]aLookup) + aAaaaMu sync.RWMutex +) + +// Interface guards +var ( + _ caddy.Provisioner = (*SRVUpstreams)(nil) + _ UpstreamSource = (*SRVUpstreams)(nil) + _ caddy.Provisioner = (*AUpstreams)(nil) + _ UpstreamSource = (*AUpstreams)(nil) +) From 687c9394c5b2df45b6de6d09ba0f0594197e5390 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Mon, 13 Dec 2021 11:12:23 -0700 Subject: [PATCH 03/14] Minor tweaks from review --- modules/caddyhttp/reverseproxy/hosts.go | 2 +- .../caddyhttp/reverseproxy/reverseproxy.go | 2 +- modules/caddyhttp/reverseproxy/upstreams.go | 30 ++++++++++++------- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 07bc7b8f823..84713b16ae3 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -143,7 +143,7 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { }, nil } -func (u *Upstream) setHost() { +func (u *Upstream) fillHost() { host := new(Host) existingHost, loaded := hosts.LoadOrStore(u.String(), host) if loaded { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 7d7891493d9..ebb5fa802f7 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -249,7 +249,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // set up upstreams for _, upstream := range h.Upstreams { // create or get the host representation for this upstream - upstream.setHost() + upstream.fillHost() // give it the circuit breaker, if any upstream.cb = h.CB diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index 37bda336628..f2a41181019 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -9,6 +9,7 @@ import ( "time" "github.com/caddyserver/caddy/v2" + "go.uber.org/zap" ) func init() { @@ -42,6 +43,8 @@ type SRVUpstreams struct { // The name label; or, if service and proto are // empty, the entire domain name to look up. Name string `json:"name,omitempty"` + + logger *zap.Logger } // CaddyModule returns the Caddy module information. @@ -57,7 +60,8 @@ func (su SRVUpstreams) String() string { return fmt.Sprintf("_%s._%s.%s", su.Service, su.Proto, su.Name) } -func (su *SRVUpstreams) Provision(_ caddy.Context) error { +func (su *SRVUpstreams) Provision(ctx caddy.Context) error { + su.logger = ctx.Logger(su) if su.Proto != "tcp" && su.Proto != "udp" { return fmt.Errorf("invalid proto '%s'", su.Proto) } @@ -74,7 +78,7 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { srvsMu.RLock() cached := srvs[suStr] srvsMu.RUnlock() - if cached.fresh() { + if cached.isFresh() { return cached.upstreams, nil } @@ -86,7 +90,7 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { // lock from when we first checked freshness; another goroutine might // have refreshed it in the meantime before we re-obtained our lock cached = srvs[suStr] - if cached.fresh() { + if cached.isFresh() { return cached.upstreams, nil } @@ -98,7 +102,13 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { _, records, err := net.DefaultResolver.LookupSRV(r.Context(), service, proto, name) if err != nil { - return nil, err + // From LookupSRV docs: "If the response contains invalid names, those records are filtered + // out and an error will be returned alongside the the remaining results, if any." Thus, we + // only return an error if no records were also returned. + if len(records) == 0 { + return nil, err + } + su.logger.Warn("SRV records filtered", zap.Error(err)) } upstreams := make([]*Upstream, len(records)) @@ -106,7 +116,7 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i] = &Upstream{ Dial: net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port))), } - upstreams[i].setHost() + upstreams[i].fillHost() } // TODO: expire these somehow @@ -125,7 +135,7 @@ type srvLookup struct { upstreams []*Upstream } -func (sl srvLookup) fresh() bool { +func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < sl.srvUpstreams.Refresh } @@ -176,7 +186,7 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { aAaaaMu.RLock() cached := aAaaa[auStr] aAaaaMu.RUnlock() - if cached.fresh() { + if cached.isFresh() { return cached.upstreams, nil } @@ -188,7 +198,7 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { // lock from when we first checked freshness; another goroutine might // have refreshed it in the meantime before we re-obtained our lock cached = aAaaa[auStr] - if cached.fresh() { + if cached.isFresh() { return cached.upstreams, nil } @@ -206,7 +216,7 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i] = &Upstream{ Dial: net.JoinHostPort(ip.String(), port), } - upstreams[i].setHost() + upstreams[i].fillHost() } // TODO: expire these somehow @@ -225,7 +235,7 @@ type aLookup struct { upstreams []*Upstream } -func (al aLookup) fresh() bool { +func (al aLookup) isFresh() bool { return time.Since(al.freshness) < al.aUpstreams.Refresh } From ee23558c326db464d3dbb81dbc53a220d3e90678 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Wed, 5 Jan 2022 13:48:02 -0700 Subject: [PATCH 04/14] Limit size of upstreams caches --- modules/caddyhttp/reverseproxy/upstreams.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f2a41181019..a49be8fe82f 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -119,7 +119,14 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i].fillHost() } - // TODO: expire these somehow + // before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full + if cached.freshness.IsZero() && len(srvs) >= 1000 { + for randomKey := range srvs { + delete(srvs, randomKey) + break + } + } + srvs[suStr] = srvLookup{ srvUpstreams: su, freshness: time.Now(), @@ -219,7 +226,14 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i].fillHost() } - // TODO: expire these somehow + // before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full + if cached.freshness.IsZero() && len(srvs) >= 1000 { + for randomKey := range aAaaa { + delete(aAaaa, randomKey) + break + } + } + aAaaa[auStr] = aLookup{ aUpstreams: au, freshness: time.Now(), From a2984011052cd0266de879ba23eec2d19fd03688 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Wed, 12 Jan 2022 10:34:30 -0700 Subject: [PATCH 05/14] Add doc notes deprecating LookupSRV --- modules/caddyhttp/reverseproxy/hosts.go | 4 ++++ modules/caddyhttp/reverseproxy/reverseproxy.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 84713b16ae3..cc9e7a1f9d2 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -46,6 +46,10 @@ type Upstream struct { // backends is down. Also be aware of open proxy vulnerabilities. Dial string `json:"dial,omitempty"` + // DEPRECATED: Use the SRVUpstreams module instead + // (http.reverse_proxy.upstreams.srv). This field will be + // removed in a future version of Caddy. + // // If DNS SRV records are used for service discovery with this // upstream, specify the DNS name for which to look up SRV // records here, instead of specifying a dial address. diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index ebb5fa802f7..6e9cfc61ba9 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -160,7 +160,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.ctx = ctx h.logger = ctx.Logger(h) - // verify SRV compatibility + // verify SRV compatibility - TODO: LookupSRV deprecated; will be removed for i, v := range h.Upstreams { if v.LookupSRV == "" { continue From 39affdf1245c70903e2075e7b79adc1d74075f6e Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Wed, 12 Jan 2022 18:02:08 -0700 Subject: [PATCH 06/14] Provision dynamic upstreams Still WIP, preparing to preserve health checker functionality --- modules/caddyhttp/reverseproxy/hosts.go | 2 +- .../caddyhttp/reverseproxy/reverseproxy.go | 62 +++++++++++-------- modules/caddyhttp/reverseproxy/upstreams.go | 6 +- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index cc9e7a1f9d2..c8c6f42210d 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -161,7 +161,7 @@ func (u *Upstream) fillHost() { type Host struct { numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) fails int64 - unhealthy int32 + unhealthy int32 // TODO: this should be removed from Host; this decision should be made by the health checker } // NumRequests returns the number of active requests to the upstream. diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 6e9cfc61ba9..5d5db964792 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -247,33 +247,8 @@ func (h *Handler) Provision(ctx caddy.Context) error { } // set up upstreams - for _, upstream := range h.Upstreams { - // create or get the host representation for this upstream - upstream.fillHost() - - // give it the circuit breaker, if any - upstream.cb = h.CB - - // if the passive health checker has a non-zero UnhealthyRequestCount - // but the upstream has no MaxRequests set (they are the same thing, - // but the passive health checker is a default value for for upstreams - // without MaxRequests), copy the value into this upstream, since the - // value in the upstream (MaxRequests) is what is used during - // availability checks - if h.HealthChecks != nil && h.HealthChecks.Passive != nil { - h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") - if h.HealthChecks.Passive.UnhealthyRequestCount > 0 && - upstream.MaxRequests == 0 { - upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount - } - } - - // upstreams need independent access to the passive - // health check policy because passive health checks - // run without access to h. - if h.HealthChecks != nil { - upstream.healthCheckPolicy = h.HealthChecks.Passive - } + for _, u := range h.Upstreams { + h.provisionUpstream(u) } if h.HealthChecks != nil { @@ -430,6 +405,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht } else { upstreams = dUpstreams } + for _, dUp := range dUpstreams { + // TODO: we should reuse the Host contained within upstreams... + h.provisionUpstream(dUp) + } } // choose an available upstream @@ -838,6 +817,35 @@ func (h Handler) directRequest(req *http.Request, di DialInfo) { req.URL.Host = reqHost } +func (h Handler) provisionUpstream(upstream *Upstream) { + // create or get the host representation for this upstream + upstream.fillHost() + + // give it the circuit breaker, if any + upstream.cb = h.CB + + // if the passive health checker has a non-zero UnhealthyRequestCount + // but the upstream has no MaxRequests set (they are the same thing, + // but the passive health checker is a default value for for upstreams + // without MaxRequests), copy the value into this upstream, since the + // value in the upstream (MaxRequests) is what is used during + // availability checks + if h.HealthChecks != nil && h.HealthChecks.Passive != nil { + h.HealthChecks.Passive.logger = h.logger.Named("health_checker.passive") + if h.HealthChecks.Passive.UnhealthyRequestCount > 0 && + upstream.MaxRequests == 0 { + upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount + } + } + + // upstreams need independent access to the passive + // health check policy because passive health checks + // run without access to h. + if h.HealthChecks != nil { + upstream.healthCheckPolicy = h.HealthChecks.Passive + } +} + // bufferedBody reads originalBody into a buffer, then returns a reader for the buffer. // Always close the return value when done with it, just like if it was the original body! func (h Handler) bufferedBody(originalBody io.ReadCloser) io.ReadCloser { diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index a49be8fe82f..b602a49054c 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -116,11 +116,10 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i] = &Upstream{ Dial: net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port))), } - upstreams[i].fillHost() } // before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full - if cached.freshness.IsZero() && len(srvs) >= 1000 { + if cached.freshness.IsZero() && len(srvs) >= 100 { for randomKey := range srvs { delete(srvs, randomKey) break @@ -223,11 +222,10 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams[i] = &Upstream{ Dial: net.JoinHostPort(ip.String(), port), } - upstreams[i].fillHost() } // before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full - if cached.freshness.IsZero() && len(srvs) >= 1000 { + if cached.freshness.IsZero() && len(srvs) >= 100 { for randomKey := range aAaaa { delete(aAaaa, randomKey) break From 08389ae50ff30a8de986b18efcdaef4879c20fcd Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Fri, 14 Jan 2022 16:01:28 -0700 Subject: [PATCH 07/14] Rejigger health checks Move active health check results into handler-specific Upstreams. Improve documentation regarding health checks and upstreams. --- modules/caddyhttp/reverseproxy/admin.go | 1 - .../caddyhttp/reverseproxy/healthchecks.go | 91 +++++---- modules/caddyhttp/reverseproxy/hosts.go | 40 ++-- .../caddyhttp/reverseproxy/reverseproxy.go | 188 +++++++++++------- .../reverseproxy/selectionpolicies_test.go | 86 ++++---- 5 files changed, 227 insertions(+), 179 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/admin.go b/modules/caddyhttp/reverseproxy/admin.go index 586b799f14e..81ec4358fe4 100644 --- a/modules/caddyhttp/reverseproxy/admin.go +++ b/modules/caddyhttp/reverseproxy/admin.go @@ -98,7 +98,6 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er results = append(results, upstreamStatus{ Address: address, - Healthy: !upstream.Unhealthy(), NumRequests: upstream.NumRequests(), Fails: upstream.Fails(), }) diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 7409e3ed417..6259d418228 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "log" "net" "net/http" "net/url" @@ -37,12 +36,32 @@ import ( type HealthChecks struct { // Active health checks run in the background on a timer. To // minimally enable active health checks, set either path or - // port (or both). + // port (or both). Note that active health check status + // (healthy/unhealthy) is stored per-proxy-handler, not + // globally; this allows different handlers to use different + // criteria to decide what defines a healthy backend. + // + // Active health checks do not run for dynamic upstreams. Active *ActiveHealthChecks `json:"active,omitempty"` // Passive health checks monitor proxied requests for errors or timeouts. // To minimally enable passive health checks, specify at least an empty - // config object. + // config object. Passive health check state is shared (stored globally), + // so a failure from one handler will be counted by all handlers; but + // the tolerances or standards for what defines healthy/unhealthy backends + // is configured per-proxy-handler. + // + // Passive health checks technically do operate on dynamic upstreams, + // but are only effective for very busy proxies where the list of + // upstreams is mostly stable. This is because the shared/global + // state of upstreams is cleaned up when the upstreams are no longer + // used. Since dynamic upstreams are allocated dynamically at each + // request (specifically, each iteration of the proxy loop per request), + // they are also cleaned up after every request. Thus, if there is a + // moment when no requests are actively referring to a particular + // upstream host, the passive health check state will be reset because + // it will be garbage-collected. It is usually better for the dynamic + // upstream module to only return healthy, available backends instead. Passive *PassiveHealthChecks `json:"passive,omitempty"` } @@ -132,7 +151,9 @@ type CircuitBreaker interface { func (h *Handler) activeHealthChecker() { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] active health checks: %v\n%s", err, debug.Stack()) + h.HealthChecks.Active.logger.Error("active health checker panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) @@ -155,7 +176,9 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { go func(upstream *Upstream) { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] active health check: %v\n%s", err, debug.Stack()) + h.HealthChecks.Active.logger.Error("active health check panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() @@ -195,7 +218,7 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { // so use a fake Host value instead; unix sockets are usually local hostAddr = "localhost" } - err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: dialAddr}, hostAddr, upstream.Host) + err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: dialAddr}, hostAddr, upstream) if err != nil { h.HealthChecks.Active.logger.Error("active health check failed", zap.String("address", hostAddr), @@ -206,14 +229,14 @@ func (h *Handler) doActiveHealthCheckForAllHosts() { } } -// doActiveHealthCheck performs a health check to host which +// doActiveHealthCheck performs a health check to upstream which // can be reached at address hostAddr. The actual address for // the request will be built according to active health checker // config. The health status of the host will be updated // according to whether it passes the health check. An error is // returned only if the health check fails to occur or if marking // the host's health status fails. -func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host *Host) error { +func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstream *Upstream) error { // create the URL for the request that acts as a health check scheme := "http" if ht, ok := h.Transport.(TLSTransport); ok && ht.TLSEnabled() { @@ -269,10 +292,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host * zap.String("host", hostAddr), zap.Error(err), ) - _, err2 := host.SetHealthy(false) - if err2 != nil { - return fmt.Errorf("marking unhealthy: %v", err2) - } + upstream.setHealthy(false) return nil } var body io.Reader = resp.Body @@ -292,10 +312,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host * zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { @@ -303,10 +320,7 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host * zap.Int("status_code", resp.StatusCode), zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } @@ -318,33 +332,21 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host * zap.String("host", hostAddr), zap.Error(err), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) { h.HealthChecks.Active.logger.Info("response body failed expectations", zap.String("host", hostAddr), ) - _, err := host.SetHealthy(false) - if err != nil { - return fmt.Errorf("marking unhealthy: %v", err) - } + upstream.setHealthy(false) return nil } } // passed health check parameters, so mark as healthy - swapped, err := host.SetHealthy(true) - if swapped { - h.HealthChecks.Active.logger.Info("host is up", - zap.String("host", hostAddr), - ) - } - if err != nil { - return fmt.Errorf("marking healthy: %v", err) + if upstream.setHealthy(true) { + h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) } return nil @@ -366,7 +368,7 @@ func (h *Handler) countFailure(upstream *Upstream) { } // count failure immediately - err := upstream.Host.CountFail(1) + err := upstream.Host.countFail(1) if err != nil { h.HealthChecks.Passive.logger.Error("could not count failure", zap.String("host", upstream.Dial), @@ -378,11 +380,20 @@ func (h *Handler) countFailure(upstream *Upstream) { go func(host *Host, failDuration time.Duration) { defer func() { if err := recover(); err != nil { - log.Printf("[PANIC] health check failure forgetter: %v\n%s", err, debug.Stack()) + h.HealthChecks.Passive.logger.Error("passive health check failure forgetter panicked", + zap.Any("error", err), + zap.ByteString("stack", debug.Stack())) } }() - time.Sleep(failDuration) - err := host.CountFail(-1) + timer := time.NewTimer(failDuration) + select { + case <-h.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + } + err := host.countFail(-1) if err != nil { h.HealthChecks.Passive.logger.Error("could not forget failure", zap.String("host", upstream.Dial), diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index c8c6f42210d..a973ecba718 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -31,6 +31,7 @@ type UpstreamPool []*Upstream // Upstream bridges this proxy's configuration to the // state of the backend host it is correlated with. +// Upstream values must not be copied. type Upstream struct { *Host `json:"-"` @@ -48,7 +49,7 @@ type Upstream struct { // DEPRECATED: Use the SRVUpstreams module instead // (http.reverse_proxy.upstreams.srv). This field will be - // removed in a future version of Caddy. + // removed in a future version of Caddy. TODO: Remove this field. // // If DNS SRV records are used for service discovery with this // upstream, specify the DNS name for which to look up SRV @@ -68,6 +69,7 @@ type Upstream struct { activeHealthCheckPort int healthCheckPolicy *PassiveHealthChecks cb CircuitBreaker + unhealthy int32 // accessed atomically; status from active health checker } func (u Upstream) String() string { @@ -90,7 +92,7 @@ func (u *Upstream) Available() bool { // is currently known to be healthy or "up". // It consults the circuit breaker, if any. func (u *Upstream) Healthy() bool { - healthy := !u.Host.Unhealthy() + healthy := u.healthy() if healthy && u.healthCheckPolicy != nil { healthy = u.Host.Fails() < u.healthCheckPolicy.MaxFails } @@ -115,7 +117,7 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) { var addr caddy.NetworkAddress if u.LookupSRV != "" { - // perform DNS lookup for SRV records and choose one + // perform DNS lookup for SRV records and choose one - TODO: deprecated srvName := repl.ReplaceAll(u.LookupSRV, "") _, records, err := net.DefaultResolver.LookupSRV(r.Context(), "", "", srvName) if err != nil { @@ -156,12 +158,11 @@ func (u *Upstream) fillHost() { u.Host = host } -// Host is the basic, in-memory representation -// of the state of a remote host. +// Host is the basic, in-memory representation of the state of a remote host. +// Its fields are accessed atomically and Host values must not be copied. type Host struct { numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) fails int64 - unhealthy int32 // TODO: this should be removed from Host; this decision should be made by the health checker } // NumRequests returns the number of active requests to the upstream. @@ -174,14 +175,9 @@ func (h *Host) Fails() int { return int(atomic.LoadInt64(&h.fails)) } -// Unhealthy returns whether the upstream is healthy. -func (h *Host) Unhealthy() bool { - return atomic.LoadInt32(&h.unhealthy) == 1 -} - -// CountRequest mutates the active request count by +// countRequest mutates the active request count by // delta. It returns an error if the adjustment fails. -func (h *Host) CountRequest(delta int) error { +func (h *Host) countRequest(delta int) error { result := atomic.AddInt64(&h.numRequests, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) @@ -189,9 +185,9 @@ func (h *Host) CountRequest(delta int) error { return nil } -// CountFail mutates the recent failures count by +// countFail mutates the recent failures count by // delta. It returns an error if the adjustment fails. -func (h *Host) CountFail(delta int) error { +func (h *Host) countFail(delta int) error { result := atomic.AddInt64(&h.fails, int64(delta)) if result < 0 { return fmt.Errorf("count below 0: %d", result) @@ -199,15 +195,21 @@ func (h *Host) CountFail(delta int) error { return nil } +// healthy returns true if the upstream is not actively marked as unhealthy. +// (This returns the status only from the "active" health checks.) +func (u *Upstream) healthy() bool { + return atomic.LoadInt32(&u.unhealthy) == 0 +} + // SetHealthy sets the upstream has healthy or unhealthy -// and returns true if the new value is different. -func (h *Host) SetHealthy(healthy bool) (bool, error) { +// and returns true if the new value is different. This +// sets the status only for the "active" health checks. +func (u *Upstream) setHealthy(healthy bool) bool { var unhealthy, compare int32 = 1, 0 if healthy { unhealthy, compare = 0, 1 } - swapped := atomic.CompareAndSwapInt32(&h.unhealthy, compare, unhealthy) - return swapped, nil + return atomic.CompareAndSwapInt32(&u.unhealthy, compare, unhealthy) } // DialInfo contains information needed to dial a diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 5d5db964792..6d75edd8b62 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -78,10 +78,18 @@ type Handler struct { // up or down. Down backends will not be proxied to. HealthChecks *HealthChecks `json:"health_checks,omitempty"` - // Upstreams is the list of backends to proxy to. + // Upstreams is the static list of backends to proxy to. Upstreams UpstreamPool `json:"upstreams,omitempty"` - // TODO: godoc + // A module for retrieving the list of upstreams dynamically. Dynamic + // upstreams are retrieved at every iteration of the proxy loop for + // each request (i.e. before every proxy attempt within every request). + // Active health checks do not work on dynamic upstreams, and passive + // health checks are only effective on dynamic upstreams if the proxy + // server is busy enough that concurrent requests to the same backends + // are continuous. Instead of health checks for dynamic upstreams, it + // is recommended that the dynamic upstream module only return available + // backends in the first place. DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` // Adjusts how often to flush the response buffer. By default, @@ -394,94 +402,122 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht repl.Set("http.reverse_proxy.duration", time.Since(start)) }() + // in the proxy loop, each iteration is an attempt to proxy the request, + // and because we may retry some number of times, carry over the error + // from previous tries because of the nuances of load balancing & retries var proxyErr error for { - // get the updated list of upstreams - upstreams := h.Upstreams - if h.DynamicUpstreams != nil { - dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) - if err != nil { - h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) - } else { - upstreams = dUpstreams - } + var done bool + done, proxyErr = h.proxyLoopIteration(r, w, proxyErr, start, repl, reqHeader, reqHost, next) + if done { + break + } + } + + if proxyErr != nil { + return statusError(proxyErr) + } + + return nil +} + +// proxyLoopIteration implements an iteration of the proxy loop. Despite the enormous amount of local state +// that has to be passed in, we brought this into its own method so that we could run defer more easily. +// It returns true when the loop is done and should break; false otherwise. The error value returned should +// be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break). +func (h *Handler) proxyLoopIteration(r *http.Request, w http.ResponseWriter, proxyErr error, start time.Time, + repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler) (bool, error) { + // get the updated list of upstreams + upstreams := h.Upstreams + if h.DynamicUpstreams != nil { + dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r) + if err != nil { + h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err)) + } else { + upstreams = dUpstreams for _, dUp := range dUpstreams { - // TODO: we should reuse the Host contained within upstreams... h.provisionUpstream(dUp) } + defer func() { + // these upstreams are dynamic, so they are only used for this iteration + // of the proxy loop; be sure to let them go away when we're done with them + for _, upstream := range dUpstreams { + _, _ = hosts.Delete(upstream.String()) + } + }() } + } - // choose an available upstream - upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) - if upstream == nil { - if proxyErr == nil { - proxyErr = fmt.Errorf("no upstreams available") - } - if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { - break - } - continue + // choose an available upstream + upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w) + if upstream == nil { + if proxyErr == nil { + proxyErr = fmt.Errorf("no upstreams available") } - - // the dial address may vary per-request if placeholders are - // used, so perform those replacements here; the resulting - // DialInfo struct should have valid network address syntax - dialInfo, err := upstream.fillDialInfo(r) - if err != nil { - return statusError(fmt.Errorf("making dial info: %v", err)) + if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { + return true, proxyErr } + return false, proxyErr + } - // attach to the request information about how to dial the upstream; - // this is necessary because the information cannot be sufficiently - // or satisfactorily represented in a URL - caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo) - - // set placeholders with information about this upstream - repl.Set("http.reverse_proxy.upstream.address", dialInfo.String()) - repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address) - repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host) - repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port) - repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests()) - repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests) - repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails()) - - // mutate request headers according to this upstream; - // because we're in a retry loop, we have to copy - // headers (and the r.Host value) from the original - // so that each retry is identical to the first - if h.Headers != nil && h.Headers.Request != nil { - r.Header = make(http.Header) - copyHeader(r.Header, reqHeader) - r.Host = reqHost - h.Headers.Request.ApplyToRequest(r) - } + // the dial address may vary per-request if placeholders are + // used, so perform those replacements here; the resulting + // DialInfo struct should have valid network address syntax + dialInfo, err := upstream.fillDialInfo(r) + if err != nil { + return true, fmt.Errorf("making dial info: %v", err) + } - // proxy the request to that upstream - proxyErr = h.reverseProxy(w, r, repl, dialInfo, next) - if proxyErr == nil || proxyErr == context.Canceled { - // context.Canceled happens when the downstream client - // cancels the request, which is not our failure - return nil - } + // attach to the request information about how to dial the upstream; + // this is necessary because the information cannot be sufficiently + // or satisfactorily represented in a URL + caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo) + + // set placeholders with information about this upstream + repl.Set("http.reverse_proxy.upstream.address", dialInfo.String()) + repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address) + repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host) + repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port) + repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests()) + repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests) + repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails()) + + // mutate request headers according to this upstream; + // because we're in a retry loop, we have to copy + // headers (and the r.Host value) from the original + // so that each retry is identical to the first + if h.Headers != nil && h.Headers.Request != nil { + r.Header = make(http.Header) + copyHeader(r.Header, reqHeader) + r.Host = reqHost + h.Headers.Request.ApplyToRequest(r) + } - // if the roundtrip was successful, don't retry the request or - // ding the health status of the upstream (an error can still - // occur after the roundtrip if, for example, a response handler - // after the roundtrip returns an error) - if succ, ok := proxyErr.(roundtripSucceeded); ok { - return succ.error - } + // proxy the request to that upstream + proxyErr = h.reverseProxy(w, r, repl, dialInfo, next) + if proxyErr == nil || proxyErr == context.Canceled { + // context.Canceled happens when the downstream client + // cancels the request, which is not our failure + return true, nil + } - // remember this failure (if enabled) - h.countFailure(upstream) + // if the roundtrip was successful, don't retry the request or + // ding the health status of the upstream (an error can still + // occur after the roundtrip if, for example, a response handler + // after the roundtrip returns an error) + if succ, ok := proxyErr.(roundtripSucceeded); ok { + return true, succ.error + } - // if we've tried long enough, break - if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { - break - } + // remember this failure (if enabled) + h.countFailure(upstream) + + // if we've tried long enough, break + if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) { + return true, proxyErr } - return statusError(proxyErr) + return false, proxyErr } // prepareRequest modifies req so that it is ready to be proxied, @@ -563,9 +599,9 @@ func (h Handler) prepareRequest(req *http.Request) error { // (This method is mostly the beginning of what was borrowed from the net/http/httputil package in the // Go standard library which was used as the foundation.) func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error { - _ = di.Upstream.Host.CountRequest(1) + _ = di.Upstream.Host.countRequest(1) //nolint:errcheck - defer di.Upstream.Host.CountRequest(-1) + defer di.Upstream.Host.countRequest(-1) // point the request to this upstream h.directRequest(req, di) diff --git a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go index 9cb205307dd..7175f774b10 100644 --- a/modules/caddyhttp/reverseproxy/selectionpolicies_test.go +++ b/modules/caddyhttp/reverseproxy/selectionpolicies_test.go @@ -48,20 +48,20 @@ func TestRoundRobinPolicy(t *testing.T) { t.Error("Expected third round robin host to be first host in the pool.") } // mark host as down - pool[1].SetHealthy(false) + pool[1].setHealthy(false) h = rrPolicy.Select(pool, req, nil) if h != pool[2] { t.Error("Expected to skip down host.") } // mark host as up - pool[1].SetHealthy(true) + pool[1].setHealthy(true) h = rrPolicy.Select(pool, req, nil) if h == pool[2] { t.Error("Expected to balance evenly among healthy hosts") } // mark host as full - pool[1].CountRequest(1) + pool[1].countRequest(1) pool[1].MaxRequests = 1 h = rrPolicy.Select(pool, req, nil) if h != pool[2] { @@ -74,13 +74,13 @@ func TestLeastConnPolicy(t *testing.T) { lcPolicy := new(LeastConnSelection) req, _ := http.NewRequest("GET", "/", nil) - pool[0].CountRequest(10) - pool[1].CountRequest(10) + pool[0].countRequest(10) + pool[1].countRequest(10) h := lcPolicy.Select(pool, req, nil) if h != pool[2] { t.Error("Expected least connection host to be third host.") } - pool[2].CountRequest(100) + pool[2].countRequest(100) h = lcPolicy.Select(pool, req, nil) if h != pool[0] && h != pool[1] { t.Error("Expected least connection host to be first or second host.") @@ -139,7 +139,7 @@ func TestIPHashPolicy(t *testing.T) { // we should get a healthy host if the original host is unhealthy and a // healthy host is available req.RemoteAddr = "172.0.0.1" - pool[1].SetHealthy(false) + pool[1].setHealthy(false) h = ipHash.Select(pool, req, nil) if h != pool[2] { t.Error("Expected ip hash policy host to be the third host.") @@ -150,10 +150,10 @@ func TestIPHashPolicy(t *testing.T) { if h != pool[2] { t.Error("Expected ip hash policy host to be the third host.") } - pool[1].SetHealthy(true) + pool[1].setHealthy(true) req.RemoteAddr = "172.0.0.3" - pool[2].SetHealthy(false) + pool[2].setHealthy(false) h = ipHash.Select(pool, req, nil) if h != pool[0] { t.Error("Expected ip hash policy host to be the first host.") @@ -192,8 +192,8 @@ func TestIPHashPolicy(t *testing.T) { } // We should get nil when there are no healthy hosts - pool[0].SetHealthy(false) - pool[1].SetHealthy(false) + pool[0].setHealthy(false) + pool[1].setHealthy(false) h = ipHash.Select(pool, req, nil) if h != nil { t.Error("Expected ip hash policy host to be nil.") @@ -211,15 +211,15 @@ func TestIPHashPolicy(t *testing.T) { {Host: new(Host)}, {Host: new(Host)}, } - pool[0].SetHealthy(false) - pool[1].SetHealthy(false) - pool[2].SetHealthy(false) - pool[3].SetHealthy(false) - pool[4].SetHealthy(false) - pool[5].SetHealthy(false) - pool[6].SetHealthy(false) - pool[7].SetHealthy(false) - pool[8].SetHealthy(true) + pool[0].setHealthy(false) + pool[1].setHealthy(false) + pool[2].setHealthy(false) + pool[3].setHealthy(false) + pool[4].setHealthy(false) + pool[5].setHealthy(false) + pool[6].setHealthy(false) + pool[7].setHealthy(false) + pool[8].setHealthy(true) // We should get a result back when there is one healthy host left. h = ipHash.Select(pool, req, nil) @@ -239,7 +239,7 @@ func TestFirstPolicy(t *testing.T) { t.Error("Expected first policy host to be the first host.") } - pool[0].SetHealthy(false) + pool[0].setHealthy(false) h = firstPolicy.Select(pool, req, nil) if h != pool[1] { t.Error("Expected first policy host to be the second host.") @@ -256,7 +256,7 @@ func TestURIHashPolicy(t *testing.T) { t.Error("Expected uri policy host to be the first host.") } - pool[0].SetHealthy(false) + pool[0].setHealthy(false) h = uriPolicy.Select(pool, request, nil) if h != pool[1] { t.Error("Expected uri policy host to be the first host.") @@ -281,7 +281,7 @@ func TestURIHashPolicy(t *testing.T) { t.Error("Expected uri policy host to be the first host.") } - pool[0].SetHealthy(false) + pool[0].setHealthy(false) h = uriPolicy.Select(pool, request, nil) if h != pool[1] { t.Error("Expected uri policy host to be the first host.") @@ -293,8 +293,8 @@ func TestURIHashPolicy(t *testing.T) { t.Error("Expected uri policy host to be the second host.") } - pool[0].SetHealthy(false) - pool[1].SetHealthy(false) + pool[0].setHealthy(false) + pool[1].setHealthy(false) h = uriPolicy.Select(pool, request, nil) if h != nil { t.Error("Expected uri policy policy host to be nil.") @@ -306,12 +306,12 @@ func TestLeastRequests(t *testing.T) { pool[0].Dial = "localhost:8080" pool[1].Dial = "localhost:8081" pool[2].Dial = "localhost:8082" - pool[0].SetHealthy(true) - pool[1].SetHealthy(true) - pool[2].SetHealthy(true) - pool[0].CountRequest(10) - pool[1].CountRequest(20) - pool[2].CountRequest(30) + pool[0].setHealthy(true) + pool[1].setHealthy(true) + pool[2].setHealthy(true) + pool[0].countRequest(10) + pool[1].countRequest(20) + pool[2].countRequest(30) result := leastRequests(pool) @@ -329,12 +329,12 @@ func TestRandomChoicePolicy(t *testing.T) { pool[0].Dial = "localhost:8080" pool[1].Dial = "localhost:8081" pool[2].Dial = "localhost:8082" - pool[0].SetHealthy(false) - pool[1].SetHealthy(true) - pool[2].SetHealthy(true) - pool[0].CountRequest(10) - pool[1].CountRequest(20) - pool[2].CountRequest(30) + pool[0].setHealthy(false) + pool[1].setHealthy(true) + pool[2].setHealthy(true) + pool[0].countRequest(10) + pool[1].countRequest(20) + pool[2].countRequest(30) request := httptest.NewRequest(http.MethodGet, "/test", nil) randomChoicePolicy := new(RandomChoiceSelection) @@ -357,9 +357,9 @@ func TestCookieHashPolicy(t *testing.T) { pool[0].Dial = "localhost:8080" pool[1].Dial = "localhost:8081" pool[2].Dial = "localhost:8082" - pool[0].SetHealthy(true) - pool[1].SetHealthy(false) - pool[2].SetHealthy(false) + pool[0].setHealthy(true) + pool[1].setHealthy(false) + pool[2].setHealthy(false) request := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() cookieHashPolicy := new(CookieHashSelection) @@ -374,8 +374,8 @@ func TestCookieHashPolicy(t *testing.T) { if h != pool[0] { t.Error("Expected cookieHashPolicy host to be the first only available host.") } - pool[1].SetHealthy(true) - pool[2].SetHealthy(true) + pool[1].setHealthy(true) + pool[2].setHealthy(true) request = httptest.NewRequest(http.MethodGet, "/test", nil) w = httptest.NewRecorder() request.AddCookie(cookieServer1) @@ -387,7 +387,7 @@ func TestCookieHashPolicy(t *testing.T) { if len(s) != 0 { t.Error("Expected cookieHashPolicy to not set a new cookie.") } - pool[0].SetHealthy(false) + pool[0].setHealthy(false) request = httptest.NewRequest(http.MethodGet, "/test", nil) w = httptest.NewRecorder() request.AddCookie(cookieServer1) From d156963f186a0055a2a3d3056e9fdabf88c06de7 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Sun, 16 Jan 2022 17:28:04 -0700 Subject: [PATCH 08/14] Deprecation notice --- modules/caddyhttp/reverseproxy/healthchecks.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index 6259d418228..317b283ecd8 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -69,8 +69,7 @@ type HealthChecks struct { // health checks (that is, health checks which occur in a // background goroutine independently). type ActiveHealthChecks struct { - // The path to use for health checks. - // DEPRECATED: Use 'uri' instead. + // DEPRECATED: Use 'uri' instead. This field will be removed. TODO: remove this field Path string `json:"path,omitempty"` // The URI (path and query) to use for health checks From d142c7a22a982248182a26d3a727934ee531ec4b Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 24 Jan 2022 03:51:10 -0500 Subject: [PATCH 09/14] Add Caddyfile support, use `caddy.Duration` --- .../reverse_proxy_dynamic_upstreams.txt | 96 +++++++++++ modules/caddyhttp/reverseproxy/caddyfile.go | 151 ++++++++++++++++++ modules/caddyhttp/reverseproxy/upstreams.go | 34 ++-- 3 files changed, 271 insertions(+), 10 deletions(-) create mode 100644 caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt new file mode 100644 index 00000000000..bef95d10a6f --- /dev/null +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt @@ -0,0 +1,96 @@ +:8884 { + reverse_proxy { + dynamic a foo 9000 + } + + reverse_proxy { + dynamic a { + name foo + port 9000 + refresh 5m + } + } +} + +:8885 { + reverse_proxy { + dynamic srv _api._http.example.com + } + + reverse_proxy { + dynamic srv { + service api + proto http + name example.com + refresh 5m + } + } +} + +---------- +{ + "apps": { + "http": { + "servers": { + "srv0": { + "listen": [ + ":8884" + ], + "routes": [ + { + "handle": [ + { + "dynamic_upstreams": { + "name": "foo", + "port": "9000", + "source": "a" + }, + "handler": "reverse_proxy" + }, + { + "dynamic_upstreams": { + "name": "foo", + "port": "9000", + "refresh": 300000000000, + "source": "a" + }, + "handler": "reverse_proxy" + } + ] + } + ] + }, + "srv1": { + "listen": [ + ":8885" + ], + "routes": [ + { + "handle": [ + { + "dynamic_upstreams": { + "name": "example.com", + "proto": "http", + "service": "api", + "source": "srv" + }, + "handler": "reverse_proxy" + }, + { + "dynamic_upstreams": { + "name": "example.com", + "proto": "http", + "refresh": 300000000000, + "service": "api", + "source": "srv" + }, + "handler": "reverse_proxy" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 52282f7bcd3..4da1b8fe90f 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -54,6 +54,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // reverse_proxy [] [] { // # upstreams // to +// dynamic [...] // // # load balancing // lb_policy [] @@ -270,6 +271,25 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } } + case "dynamic": + if !d.NextArg() { + return d.ArgErr() + } + if h.DynamicUpstreams != nil { + return d.Err("dynamic upstreams already specified") + } + dynModule := d.Val() + modID := "http.reverse_proxy.upstreams." + dynModule + unm, err := caddyfile.UnmarshalModule(d, modID) + if err != nil { + return err + } + source, ok := unm.(UpstreamSource) + if !ok { + return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm) + } + h.DynamicUpstreamsRaw = caddyconfig.JSONModuleObject(source, "source", dynModule, nil) + case "lb_policy": if !d.NextArg() { return d.ArgErr() @@ -1037,6 +1057,137 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return nil } +// UnmarshalCaddyfile deserializes Caddyfile tokens into h. +// +// dynamic srv [
] { +// service +// proto +// name +// refresh +// } +// +func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + args := d.RemainingArgs() + if len(args) > 1 { + return d.ArgErr() + } + if len(args) > 0 { + service, proto, name, err := u.ParseAddress(args[0]) + if err != nil { + return err + } + u.Service = service + u.Proto = proto + u.Name = name + } + + for d.NextBlock(0) { + switch d.Val() { + case "service": + if !d.NextArg() { + return d.ArgErr() + } + if u.Service != "" { + return d.Errf("srv service has already been specified") + } + u.Service = d.Val() + + case "proto": + if !d.NextArg() { + return d.ArgErr() + } + if u.Proto != "" { + return d.Errf("srv proto has already been specified") + } + u.Proto = d.Val() + + case "name": + if !d.NextArg() { + return d.ArgErr() + } + if u.Name != "" { + return d.Errf("srv name has already been specified") + } + u.Name = d.Val() + + case "refresh": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("parsing refresh interval duration: %v", err) + } + u.Refresh = caddy.Duration(dur) + + default: + return d.Errf("unrecognized srv option '%s'", d.Val()) + } + } + } + + return nil +} + +// UnmarshalCaddyfile deserializes Caddyfile tokens into h. +// +// dynamic a [ +// port +// refresh +// } +// +func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + args := d.RemainingArgs() + if len(args) > 2 { + return d.ArgErr() + } + if len(args) > 0 { + u.Name = args[0] + u.Port = args[1] + } + + for d.NextBlock(0) { + switch d.Val() { + case "name": + if !d.NextArg() { + return d.ArgErr() + } + if u.Name != "" { + return d.Errf("a name has already been specified") + } + u.Name = d.Val() + + case "port": + if !d.NextArg() { + return d.ArgErr() + } + if u.Port != "" { + return d.Errf("a port has already been specified") + } + u.Port = d.Val() + + case "refresh": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("parsing refresh interval duration: %v", err) + } + u.Refresh = caddy.Duration(dur) + + default: + return d.Errf("unrecognized srv option '%s'", d.Val()) + } + } + } + + return nil +} + const matcherPrefix = "@" // Interface guards diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index b602a49054c..1aabdfdb8f5 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -5,6 +5,7 @@ import ( "net" "net/http" "strconv" + "strings" "sync" "time" @@ -30,10 +31,6 @@ func init() { // // Returned upstreams are sorted by priority and weight. type SRVUpstreams struct { - // The interval at which to refresh the SRV lookup. - // Results are cached between lookups. Default: 1m - Refresh time.Duration `json:"refresh,omitempty"` - // The service label. Service string `json:"service,omitempty"` @@ -44,6 +41,10 @@ type SRVUpstreams struct { // empty, the entire domain name to look up. Name string `json:"name,omitempty"` + // The interval at which to refresh the SRV lookup. + // Results are cached between lookups. Default: 1m + Refresh caddy.Duration `json:"refresh,omitempty"` + logger *zap.Logger } @@ -66,7 +67,7 @@ func (su *SRVUpstreams) Provision(ctx caddy.Context) error { return fmt.Errorf("invalid proto '%s'", su.Proto) } if su.Refresh == 0 { - su.Refresh = time.Minute + su.Refresh = caddy.Duration(time.Minute) } return nil } @@ -135,6 +136,19 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { return upstreams, nil } +// ParseAddress takes a full SRV address and parses it into +// its three constituent parts. Used for parsing configuration +// so that users may specify the address with dots instead of +// specifying each part separately. +func (SRVUpstreams) ParseAddress(address string) (string, string, string, error) { + parts := strings.SplitN(address, ".", 3) + if len(parts) != 3 { + return "", "", "", fmt.Errorf("expected 3 parts (service, proto, name), got %d parts", len(parts)) + } + + return strings.TrimLeft(parts[0], "_"), strings.TrimLeft(parts[1], "_"), strings.TrimLeft(parts[2], "_"), nil +} + type srvLookup struct { srvUpstreams SRVUpstreams freshness time.Time @@ -142,7 +156,7 @@ type srvLookup struct { } func (sl srvLookup) isFresh() bool { - return time.Since(sl.freshness) < sl.srvUpstreams.Refresh + return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } var ( @@ -160,9 +174,9 @@ type AUpstreams struct { // The port to use with the upstreams. Default: 80 Port string `json:"port,omitempty"` - // The interval at which to refresh the SRV lookup. + // The interval at which to refresh the A lookup. // Results are cached between lookups. Default: 1m - Refresh time.Duration `json:"refresh,omitempty"` + Refresh caddy.Duration `json:"refresh,omitempty"` } // CaddyModule returns the Caddy module information. @@ -177,7 +191,7 @@ func (au AUpstreams) String() string { return au.Name } func (au *AUpstreams) Provision(_ caddy.Context) error { if au.Refresh == 0 { - au.Refresh = time.Minute + au.Refresh = caddy.Duration(time.Minute) } if au.Port == "" { au.Port = "80" @@ -248,7 +262,7 @@ type aLookup struct { } func (al aLookup) isFresh() bool { - return time.Since(al.freshness) < al.aUpstreams.Refresh + return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } var ( From d4683b79bd07da8712371ab29bc25c9b38e5aa2f Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 24 Jan 2022 03:59:53 -0500 Subject: [PATCH 10/14] Interface guards --- modules/caddyhttp/reverseproxy/caddyfile.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 4da1b8fe90f..09728bb0f2b 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -1194,4 +1194,6 @@ const matcherPrefix = "@" var ( _ caddyfile.Unmarshaler = (*Handler)(nil) _ caddyfile.Unmarshaler = (*HTTPTransport)(nil) + _ caddyfile.Unmarshaler = (*SRVUpstreams)(nil) + _ caddyfile.Unmarshaler = (*AUpstreams)(nil) ) From c9d0e1db5054e85f32a7d77fb65133c877cede73 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 24 Jan 2022 19:02:58 -0500 Subject: [PATCH 11/14] Implement custom resolvers, add resolvers to http transport Caddyfile --- .../reverse_proxy_dynamic_upstreams.txt | 22 ++++ .../caddyfile_adapt/reverse_proxy_options.txt | 7 ++ modules/caddyhttp/reverseproxy/caddyfile.go | 88 ++++++++++++-- .../caddyhttp/reverseproxy/httptransport.go | 24 +--- modules/caddyhttp/reverseproxy/upstreams.go | 110 +++++++++++++++++- 5 files changed, 221 insertions(+), 30 deletions(-) diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt index bef95d10a6f..76dd41fda17 100644 --- a/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt @@ -8,6 +8,9 @@ name foo port 9000 refresh 5m + resolvers 8.8.8.8 8.8.4.4 + dial_timeout 2s + dial_fallback_delay 300ms } } } @@ -23,6 +26,9 @@ proto http name example.com refresh 5m + resolvers 8.8.8.8 8.8.4.4 + dial_timeout 1s + dial_fallback_delay -1s } } } @@ -49,9 +55,17 @@ }, { "dynamic_upstreams": { + "dial_fallback_delay": 300000000, + "dial_timeout": 2000000000, "name": "foo", "port": "9000", "refresh": 300000000000, + "resolver": { + "addresses": [ + "8.8.8.8", + "8.8.4.4" + ] + }, "source": "a" }, "handler": "reverse_proxy" @@ -78,9 +92,17 @@ }, { "dynamic_upstreams": { + "dial_fallback_delay": -1000000000, + "dial_timeout": 1000000000, "name": "example.com", "proto": "http", "refresh": 300000000000, + "resolver": { + "addresses": [ + "8.8.8.8", + "8.8.4.4" + ] + }, "service": "api", "source": "srv" }, diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_options.txt b/caddytest/integration/caddyfile_adapt/reverse_proxy_options.txt index 70e7af602cb..e41b9004fc5 100644 --- a/caddytest/integration/caddyfile_adapt/reverse_proxy_options.txt +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_options.txt @@ -17,6 +17,7 @@ https://example.com { dial_fallback_delay 5s response_header_timeout 8s expect_continue_timeout 9s + resolvers 8.8.8.8 8.8.4.4 versions h2c 2 compression off @@ -88,6 +89,12 @@ https://example.com { "max_response_header_size": 30000000, "protocol": "http", "read_buffer_size": 10000000, + "resolver": { + "addresses": [ + "8.8.8.8", + "8.8.4.4" + ] + }, "response_header_timeout": 8000000000, "versions": [ "h2c", diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 09728bb0f2b..140c0c5694c 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -817,6 +817,7 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error // dial_fallback_delay // response_header_timeout // expect_continue_timeout +// resolvers // tls // tls_client_auth | // tls_insecure_skip_verify @@ -907,6 +908,15 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } h.ExpectContinueTimeout = caddy.Duration(dur) + case "resolvers": + if h.Resolver == nil { + h.Resolver = new(UpstreamResolver) + } + h.Resolver.Addresses = d.RemainingArgs() + if len(h.Resolver.Addresses) == 0 { + return d.Errf("must specify at least one resolver address") + } + case "tls_client_auth": if h.TLS == nil { h.TLS = new(TLSConfig) @@ -1060,10 +1070,13 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // // dynamic srv [
] { -// service -// proto -// name -// refresh +// service +// proto +// name +// refresh +// resolvers +// dial_timeout +// dial_fallback_delay // } // func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { @@ -1121,6 +1134,35 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } u.Refresh = caddy.Duration(dur) + case "resolvers": + if u.Resolver == nil { + u.Resolver = new(UpstreamResolver) + } + u.Resolver.Addresses = d.RemainingArgs() + if len(u.Resolver.Addresses) == 0 { + return d.Errf("must specify at least one resolver address") + } + + case "dial_timeout": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad timeout value '%s': %v", d.Val(), err) + } + u.DialTimeout = caddy.Duration(dur) + + case "dial_fallback_delay": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad delay value '%s': %v", d.Val(), err) + } + u.FallbackDelay = caddy.Duration(dur) + default: return d.Errf("unrecognized srv option '%s'", d.Val()) } @@ -1133,9 +1175,12 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // // dynamic a [ -// port -// refresh +// name +// port +// refresh +// resolvers +// dial_timeout +// dial_fallback_delay // } // func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { @@ -1179,6 +1224,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } u.Refresh = caddy.Duration(dur) + case "resolvers": + if u.Resolver == nil { + u.Resolver = new(UpstreamResolver) + } + u.Resolver.Addresses = d.RemainingArgs() + if len(u.Resolver.Addresses) == 0 { + return d.Errf("must specify at least one resolver address") + } + + case "dial_timeout": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad timeout value '%s': %v", d.Val(), err) + } + u.DialTimeout = caddy.Duration(dur) + + case "dial_fallback_delay": + if !d.NextArg() { + return d.ArgErr() + } + dur, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("bad delay value '%s': %v", d.Val(), err) + } + u.FallbackDelay = caddy.Duration(dur) + default: return d.Errf("unrecognized srv option '%s'", d.Val()) } diff --git a/modules/caddyhttp/reverseproxy/httptransport.go b/modules/caddyhttp/reverseproxy/httptransport.go index 4be51afe1fd..f7472bea2d6 100644 --- a/modules/caddyhttp/reverseproxy/httptransport.go +++ b/modules/caddyhttp/reverseproxy/httptransport.go @@ -168,15 +168,9 @@ func (h *HTTPTransport) NewTransport(ctx caddy.Context) (*http.Transport, error) } if h.Resolver != nil { - for _, v := range h.Resolver.Addresses { - addr, err := caddy.ParseNetworkAddress(v) - if err != nil { - return nil, err - } - if addr.PortRangeSize() != 1 { - return nil, fmt.Errorf("resolver address must have exactly one address; cannot call %v", addr) - } - h.Resolver.netAddrs = append(h.Resolver.netAddrs, addr) + err := h.Resolver.ParseAddresses() + if err != nil { + return nil, err } d := &net.Dialer{ Timeout: time.Duration(h.DialTimeout), @@ -406,18 +400,6 @@ func (t TLSConfig) MakeTLSClientConfig(ctx caddy.Context) (*tls.Config, error) { return cfg, nil } -// UpstreamResolver holds the set of addresses of DNS resolvers of -// upstream addresses -type UpstreamResolver struct { - // The addresses of DNS resolvers to use when looking up the addresses of proxy upstreams. - // It accepts [network addresses](/docs/conventions#network-addresses) - // with port range of only 1. If the host is an IP address, it will be dialed directly to resolve the upstream server. - // If the host is not an IP address, the addresses are resolved using the [name resolution convention](https://golang.org/pkg/net/#hdr-Name_Resolution) of the Go standard library. - // If the array contains more than 1 resolver address, one is chosen at random. - Addresses []string `json:"addresses,omitempty"` - netAddrs []caddy.NetworkAddress -} - // KeepAlive holds configuration pertaining to HTTP Keep-Alive. type KeepAlive struct { // Whether HTTP Keep-Alive is enabled. Default: true diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index 1aabdfdb8f5..7456c900440 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -1,7 +1,9 @@ package reverseproxy import ( + "context" "fmt" + weakrand "math/rand" "net" "net/http" "strconv" @@ -45,6 +47,21 @@ type SRVUpstreams struct { // Results are cached between lookups. Default: 1m Refresh caddy.Duration `json:"refresh,omitempty"` + // Configures the DNS resolver used to resolve the + // SRV address to SRV records. + Resolver *UpstreamResolver `json:"resolver,omitempty"` + + // If Resolver is configured, how long to wait before + // timing out trying to connect to the DNS server. + DialTimeout caddy.Duration `json:"dial_timeout,omitempty"` + + // If Resolver is configured, how long to wait before + // spawning an RFC 6555 Fast Fallback connection. + // A negative value disables this. + FallbackDelay caddy.Duration `json:"dial_fallback_delay,omitempty"` + + resolver *net.Resolver + logger *zap.Logger } @@ -69,6 +86,29 @@ func (su *SRVUpstreams) Provision(ctx caddy.Context) error { if su.Refresh == 0 { su.Refresh = caddy.Duration(time.Minute) } + + if su.Resolver != nil { + err := su.Resolver.ParseAddresses() + if err != nil { + return err + } + d := &net.Dialer{ + Timeout: time.Duration(su.DialTimeout), + FallbackDelay: time.Duration(su.FallbackDelay), + } + su.resolver = &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, _, _ string) (net.Conn, error) { + //nolint:gosec + addr := su.Resolver.netAddrs[weakrand.Intn(len(su.Resolver.netAddrs))] + return d.DialContext(ctx, addr.Network, addr.JoinHostPort(0)) + }, + } + } + if su.resolver == nil { + su.resolver = net.DefaultResolver + } + return nil } @@ -101,7 +141,7 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { proto := repl.ReplaceAll(su.Proto, "") name := repl.ReplaceAll(su.Name, "") - _, records, err := net.DefaultResolver.LookupSRV(r.Context(), service, proto, name) + _, records, err := su.resolver.LookupSRV(r.Context(), service, proto, name) if err != nil { // From LookupSRV docs: "If the response contains invalid names, those records are filtered // out and an error will be returned alongside the the remaining results, if any." Thus, we @@ -177,6 +217,21 @@ type AUpstreams struct { // The interval at which to refresh the A lookup. // Results are cached between lookups. Default: 1m Refresh caddy.Duration `json:"refresh,omitempty"` + + // Configures the DNS resolver used to resolve the + // domain name to A records. + Resolver *UpstreamResolver `json:"resolver,omitempty"` + + // If Resolver is configured, how long to wait before + // timing out trying to connect to the DNS server. + DialTimeout caddy.Duration `json:"dial_timeout,omitempty"` + + // If Resolver is configured, how long to wait before + // spawning an RFC 6555 Fast Fallback connection. + // A negative value disables this. + FallbackDelay caddy.Duration `json:"dial_fallback_delay,omitempty"` + + resolver *net.Resolver } // CaddyModule returns the Caddy module information. @@ -196,6 +251,29 @@ func (au *AUpstreams) Provision(_ caddy.Context) error { if au.Port == "" { au.Port = "80" } + + if au.Resolver != nil { + err := au.Resolver.ParseAddresses() + if err != nil { + return err + } + d := &net.Dialer{ + Timeout: time.Duration(au.DialTimeout), + FallbackDelay: time.Duration(au.FallbackDelay), + } + au.resolver = &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, _, _ string) (net.Conn, error) { + //nolint:gosec + addr := au.Resolver.netAddrs[weakrand.Intn(len(au.Resolver.netAddrs))] + return d.DialContext(ctx, addr.Network, addr.JoinHostPort(0)) + }, + } + } + if au.resolver == nil { + au.resolver = net.DefaultResolver + } + return nil } @@ -226,7 +304,7 @@ func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { name := repl.ReplaceAll(au.Name, "") port := repl.ReplaceAll(au.Port, "") - ips, err := net.DefaultResolver.LookupIPAddr(r.Context(), name) + ips, err := au.resolver.LookupIPAddr(r.Context(), name) if err != nil { return nil, err } @@ -265,6 +343,34 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// UpstreamResolver holds the set of addresses of DNS resolvers of +// upstream addresses +type UpstreamResolver struct { + // The addresses of DNS resolvers to use when looking up the addresses of proxy upstreams. + // It accepts [network addresses](/docs/conventions#network-addresses) + // with port range of only 1. If the host is an IP address, it will be dialed directly to resolve the upstream server. + // If the host is not an IP address, the addresses are resolved using the [name resolution convention](https://golang.org/pkg/net/#hdr-Name_Resolution) of the Go standard library. + // If the array contains more than 1 resolver address, one is chosen at random. + Addresses []string `json:"addresses,omitempty"` + netAddrs []caddy.NetworkAddress +} + +// ParseAddresses parses all the configured network addresses +// and ensures they're ready to be used. +func (u UpstreamResolver) ParseAddresses() error { + for _, v := range u.Addresses { + addr, err := caddy.ParseNetworkAddress(v) + if err != nil { + return err + } + if addr.PortRangeSize() != 1 { + return fmt.Errorf("resolver address must have exactly one address; cannot call %v", addr) + } + u.netAddrs = append(u.netAddrs, addr) + } + return nil +} + var ( aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex From a2aebf550521e6e2dbd39b831fb03a5e27e39eb7 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 24 Jan 2022 20:21:40 -0500 Subject: [PATCH 12/14] SRV: fix Caddyfile `name` inline arg, remove proto condition --- .../reverse_proxy_dynamic_upstreams.txt | 10 ++++------ modules/caddyhttp/reverseproxy/caddyfile.go | 10 ++-------- modules/caddyhttp/reverseproxy/upstreams.go | 17 ----------------- 3 files changed, 6 insertions(+), 31 deletions(-) diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt index 76dd41fda17..2f2cbcd3eb1 100644 --- a/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_dynamic_upstreams.txt @@ -17,13 +17,13 @@ :8885 { reverse_proxy { - dynamic srv _api._http.example.com + dynamic srv _api._tcp.example.com } reverse_proxy { dynamic srv { service api - proto http + proto tcp name example.com refresh 5m resolvers 8.8.8.8 8.8.4.4 @@ -83,9 +83,7 @@ "handle": [ { "dynamic_upstreams": { - "name": "example.com", - "proto": "http", - "service": "api", + "name": "_api._tcp.example.com", "source": "srv" }, "handler": "reverse_proxy" @@ -95,7 +93,7 @@ "dial_fallback_delay": -1000000000, "dial_timeout": 1000000000, "name": "example.com", - "proto": "http", + "proto": "tcp", "refresh": 300000000000, "resolver": { "addresses": [ diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 140c0c5694c..f5cac8ff2a5 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -1069,7 +1069,7 @@ func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic srv [
] { +// dynamic srv [] { // service // proto // name @@ -1086,13 +1086,7 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return d.ArgErr() } if len(args) > 0 { - service, proto, name, err := u.ParseAddress(args[0]) - if err != nil { - return err - } - u.Service = service - u.Proto = proto - u.Name = name + u.Name = args[0] } for d.NextBlock(0) { diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index 7456c900440..0a86154fee7 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -7,7 +7,6 @@ import ( "net" "net/http" "strconv" - "strings" "sync" "time" @@ -80,9 +79,6 @@ func (su SRVUpstreams) String() string { func (su *SRVUpstreams) Provision(ctx caddy.Context) error { su.logger = ctx.Logger(su) - if su.Proto != "tcp" && su.Proto != "udp" { - return fmt.Errorf("invalid proto '%s'", su.Proto) - } if su.Refresh == 0 { su.Refresh = caddy.Duration(time.Minute) } @@ -176,19 +172,6 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { return upstreams, nil } -// ParseAddress takes a full SRV address and parses it into -// its three constituent parts. Used for parsing configuration -// so that users may specify the address with dots instead of -// specifying each part separately. -func (SRVUpstreams) ParseAddress(address string) (string, string, string, error) { - parts := strings.SplitN(address, ".", 3) - if len(parts) != 3 { - return "", "", "", fmt.Errorf("expected 3 parts (service, proto, name), got %d parts", len(parts)) - } - - return strings.TrimLeft(parts[0], "_"), strings.TrimLeft(parts[1], "_"), strings.TrimLeft(parts[2], "_"), nil -} - type srvLookup struct { srvUpstreams SRVUpstreams freshness time.Time From 7bfab129645d683511fffc3ceb9407a3b929f659 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Tue, 25 Jan 2022 02:45:04 -0500 Subject: [PATCH 13/14] Use pointer receiver --- modules/caddyhttp/reverseproxy/upstreams.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index 0a86154fee7..56fd3b946da 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -340,7 +340,7 @@ type UpstreamResolver struct { // ParseAddresses parses all the configured network addresses // and ensures they're ready to be used. -func (u UpstreamResolver) ParseAddresses() error { +func (u *UpstreamResolver) ParseAddresses() error { for _, v := range u.Addresses { addr, err := caddy.ParseNetworkAddress(v) if err != nil { From 9a2842055f141dc1b84c14b4793e749807b61348 Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Fri, 11 Feb 2022 12:31:27 -0700 Subject: [PATCH 14/14] Add debug logs --- modules/caddyhttp/reverseproxy/reverseproxy.go | 5 +++++ modules/caddyhttp/reverseproxy/upstreams.go | 15 ++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 6d75edd8b62..46d15ed23d7 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -438,6 +438,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, w http.ResponseWriter, pro for _, dUp := range dUpstreams { h.provisionUpstream(dUp) } + h.logger.Debug("provisioned dynamic upstreams", zap.Int("count", len(dUpstreams))) defer func() { // these upstreams are dynamic, so they are only used for this iteration // of the proxy loop; be sure to let them go away when we're done with them @@ -468,6 +469,10 @@ func (h *Handler) proxyLoopIteration(r *http.Request, w http.ResponseWriter, pro return true, fmt.Errorf("making dial info: %v", err) } + h.logger.Debug("selected upstream", + zap.String("dial", dialInfo.Address), + zap.Int("total_upstreams", len(upstreams))) + // attach to the request information about how to dial the upstream; // this is necessary because the information cannot be sufficiently // or satisfactorily represented in a URL diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index 56fd3b946da..eb5845fc7b2 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -137,6 +137,11 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { proto := repl.ReplaceAll(su.Proto, "") name := repl.ReplaceAll(su.Name, "") + su.logger.Debug("refreshing SRV upstreams", + zap.String("service", service), + zap.String("proto", proto), + zap.String("name", name)) + _, records, err := su.resolver.LookupSRV(r.Context(), service, proto, name) if err != nil { // From LookupSRV docs: "If the response contains invalid names, those records are filtered @@ -150,9 +155,13 @@ func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { upstreams := make([]*Upstream, len(records)) for i, rec := range records { - upstreams[i] = &Upstream{ - Dial: net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port))), - } + su.logger.Debug("discovered SRV record", + zap.String("target", rec.Target), + zap.Uint16("port", rec.Port), + zap.Uint16("priority", rec.Priority), + zap.Uint16("weight", rec.Weight)) + addr := net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port))) + upstreams[i] = &Upstream{Dial: addr} } // before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full