Skip to content

Commit

Permalink
reverseproxy: Begin refactor to enable dynamic upstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
mholt committed Nov 24, 2021
1 parent 0ffb222 commit 9adeea5
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 72 deletions.
2 changes: 1 addition & 1 deletion modules/caddyhttp/reverseproxy/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
return false
}

upstream, ok := val.(*upstreamHost)
upstream, ok := val.(*Host)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Expand Down
4 changes: 2 additions & 2 deletions modules/caddyhttp/reverseproxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (h *Handler) doActiveHealthCheckForAllHosts() {
// according to whether it passes the health check. An error is
// returned only if the health check fails to occur or if marking
// the host's health status fails.
func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host Host) error {
func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, host *Host) error {
// create the URL for the request that acts as a health check
scheme := "http"
if ht, ok := h.Transport.(TLSTransport); ok && ht.TLSEnabled() {
Expand Down Expand Up @@ -375,7 +375,7 @@ func (h *Handler) countFailure(upstream *Upstream) {
}

// forget it later
go func(host Host, failDuration time.Duration) {
go func(host *Host, failDuration time.Duration) {
defer func() {
if err := recover(); err != nil {
log.Printf("[PANIC] health check failure forgetter: %v\n%s", err, debug.Stack())
Expand Down
64 changes: 16 additions & 48 deletions modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,13 @@ import (
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
)

// Host represents a remote host which can be proxied to.
// Its methods must be safe for concurrent use.
type Host interface {
// NumRequests returns the number of requests
// currently in process with the host.
NumRequests() int

// Fails returns the count of recent failures.
Fails() int

// Unhealthy returns true if the backend is unhealthy.
Unhealthy() bool

// CountRequest atomically counts the given number of
// requests as currently in process with the host. The
// count should not go below 0.
CountRequest(int) error

// CountFail atomically counts the given number of
// failures with the host. The count should not go
// below 0.
CountFail(int) error

// SetHealthy atomically marks the host as either
// healthy (true) or unhealthy (false). If the given
// status is the same, this should be a no-op and
// return false. It returns true if the status was
// changed; i.e. if it is now different from before.
SetHealthy(bool) (bool, error)
}

// UpstreamPool is a collection of upstreams.
type UpstreamPool []*Upstream

// Upstream bridges this proxy's configuration to the
// state of the backend host it is correlated with.
type Upstream struct {
Host `json:"-"`
*Host `json:"-"`

// The [network address](/docs/conventions#network-addresses)
// to dial to connect to the upstream. Must represent precisely
Expand Down Expand Up @@ -174,34 +143,33 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) {
}, nil
}

// upstreamHost is the basic, in-memory representation
// of the state of a remote host. It implements the
// Host interface.
type upstreamHost struct {
// Host is the basic, in-memory representation
// of the state of a remote host.
type Host struct {
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
unhealthy int32
}

// NumRequests returns the number of active requests to the upstream.
func (uh *upstreamHost) NumRequests() int {
return int(atomic.LoadInt64(&uh.numRequests))
func (h *Host) NumRequests() int {
return int(atomic.LoadInt64(&h.numRequests))
}

// Fails returns the number of recent failures with the upstream.
func (uh *upstreamHost) Fails() int {
return int(atomic.LoadInt64(&uh.fails))
func (h *Host) Fails() int {
return int(atomic.LoadInt64(&h.fails))
}

// Unhealthy returns whether the upstream is healthy.
func (uh *upstreamHost) Unhealthy() bool {
return atomic.LoadInt32(&uh.unhealthy) == 1
func (h *Host) Unhealthy() bool {
return atomic.LoadInt32(&h.unhealthy) == 1
}

// CountRequest mutates the active request count by
// delta. It returns an error if the adjustment fails.
func (uh *upstreamHost) CountRequest(delta int) error {
result := atomic.AddInt64(&uh.numRequests, int64(delta))
func (h *Host) CountRequest(delta int) error {
result := atomic.AddInt64(&h.numRequests, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
Expand All @@ -210,8 +178,8 @@ func (uh *upstreamHost) CountRequest(delta int) error {

// CountFail mutates the recent failures count by
// delta. It returns an error if the adjustment fails.
func (uh *upstreamHost) CountFail(delta int) error {
result := atomic.AddInt64(&uh.fails, int64(delta))
func (h *Host) CountFail(delta int) error {
result := atomic.AddInt64(&h.fails, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
Expand All @@ -220,12 +188,12 @@ func (uh *upstreamHost) CountFail(delta int) error {

// SetHealthy sets the upstream has healthy or unhealthy
// and returns true if the new value is different.
func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) {
func (h *Host) SetHealthy(healthy bool) (bool, error) {
var unhealthy, compare int32 = 1, 0
if healthy {
unhealthy, compare = 0, 1
}
swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy)
swapped := atomic.CompareAndSwapInt32(&h.unhealthy, compare, unhealthy)
return swapped, nil
}

Expand Down
39 changes: 34 additions & 5 deletions modules/caddyhttp/reverseproxy/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Handler struct {
// Upstreams is the list of backends to proxy to.
Upstreams UpstreamPool `json:"upstreams,omitempty"`

// TODO: godoc
DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`

// Adjusts how often to flush the response buffer. By default,
// no periodic flushing is done. A negative value disables
// response buffering, and flushes immediately after each
Expand Down Expand Up @@ -130,8 +133,9 @@ type Handler struct {
// - `{http.reverse_proxy.header.*}` The headers from the response
HandleResponse []caddyhttp.ResponseHandler `json:"handle_response,omitempty"`

Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"`
Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"`
DynamicUpstreams UpstreamSource `json:"-"`

// Holds the named response matchers from the Caddyfile while adapting
responseMatchers map[string]caddyhttp.ResponseMatcher
Expand Down Expand Up @@ -191,6 +195,13 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
h.CB = mod.(CircuitBreaker)
}
if h.DynamicUpstreamsRaw != nil {
mod, err := ctx.LoadModule(h, "DynamicUpstreamsRaw")
if err != nil {
return fmt.Errorf("loading upstream source module: %v", err)
}
h.DynamicUpstreams = mod.(UpstreamSource)
}

// ensure any embedded headers handler module gets provisioned
// (see https://caddy.community/t/set-cookie-manipulation-in-reverse-proxy/7666?u=matt
Expand Down Expand Up @@ -245,10 +256,10 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// set up upstreams
for _, upstream := range h.Upstreams {
// create or get the host representation for this upstream
var host Host = new(upstreamHost)
host := new(Host)
existingHost, loaded := hosts.LoadOrStore(upstream.String(), host)
if loaded {
host = existingHost.(Host)
host = existingHost.(*Host)
}
upstream.Host = host

Expand Down Expand Up @@ -406,10 +417,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
repl.Set("http.reverse_proxy.duration", time.Since(start))
}()

// get the list of upstreams
upstreams := h.Upstreams
if h.DynamicUpstreams != nil {
dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r)
if err != nil {
h.logger.Error("failed getting dynamic upstreams; falling back to static upstreams", zap.Error(err))
} else {
upstreams = dUpstreams
}
}

var proxyErr error
for {
// choose an available upstream
upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, r, w)
upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
if upstream == nil {
if proxyErr == nil {
proxyErr = fmt.Errorf("no upstreams available")
Expand Down Expand Up @@ -908,6 +930,13 @@ type Selector interface {
Select(UpstreamPool, *http.Request, http.ResponseWriter) *Upstream
}

// UpstreamSource gets the list of upstreams that can be used when
// proxying a request. Returned upstreams will be load balanced and
// health-checked.
type UpstreamSource interface {
GetUpstreams(*http.Request) ([]*Upstream, error)
}

// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
Expand Down
32 changes: 16 additions & 16 deletions modules/caddyhttp/reverseproxy/selectionpolicies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (

func testPool() UpstreamPool {
return UpstreamPool{
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
}
}

Expand Down Expand Up @@ -167,8 +167,8 @@ func TestIPHashPolicy(t *testing.T) {
// We should be able to resize the host pool and still be able to predict
// where a req will be routed with the same IP's used above
pool = UpstreamPool{
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(Host)},
{Host: new(Host)},
}
req.RemoteAddr = "172.0.0.1:80"
h = ipHash.Select(pool, req, nil)
Expand Down Expand Up @@ -201,15 +201,15 @@ func TestIPHashPolicy(t *testing.T) {

// Reproduce #4135
pool = UpstreamPool{
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
{Host: new(Host)},
}
pool[0].SetHealthy(false)
pool[1].SetHealthy(false)
Expand Down Expand Up @@ -271,8 +271,8 @@ func TestURIHashPolicy(t *testing.T) {
// We should be able to resize the host pool and still be able to predict
// where a request will be routed with the same URI's used above
pool = UpstreamPool{
{Host: new(upstreamHost)},
{Host: new(upstreamHost)},
{Host: new(Host)},
{Host: new(Host)},
}

request = httptest.NewRequest(http.MethodGet, "/test", nil)
Expand Down

0 comments on commit 9adeea5

Please sign in to comment.