Skip to content

Commit

Permalink
Request forwarding (#1721)
Browse files Browse the repository at this point in the history
Add request forwarding.
  • Loading branch information
jefferai committed Aug 15, 2016
1 parent 5eaab9f commit 6455400
Show file tree
Hide file tree
Showing 60 changed files with 12,548 additions and 420 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Expand Up @@ -2,6 +2,11 @@

DEPRECATIONS/BREAKING CHANGES:

* Once the active node is 0.6.1, standby nodes must also be 0.6.1 in order to
connect to the HA cluster. We recommend following our [general upgrade
instructions](https://www.vaultproject.io/docs/install/upgrade.html) in
addition to 0.6.1-specific upgrade instructions to ensure that this is not
an issue.
* Root tokens (tokens with the `root` policy) can no longer be created except
by another root token or the `generate-root` endpoint.
* Issued certificates from the `pki` backend against new roles created or
Expand Down Expand Up @@ -104,7 +109,7 @@ IMPROVEMENTS:
[GH-1699]
* physical/etcd: Support `ETCD_ADDR` env var for specifying addresses [GH-1576]
* physical/consul: Allowing additional tags to be added to Consul service
registration via `service-tags` option [GH-1643]
registration via `service_tags` option [GH-1643]
* secret/aws: Listing of roles is supported now [GH-1546]
* secret/cassandra: Add `connect_timeout` value for Cassandra connection
configuration [GH-1581]
Expand Down
243 changes: 179 additions & 64 deletions command/server.go
Expand Up @@ -54,7 +54,7 @@ type ServerCommand struct {
}

func (c *ServerCommand) Run(args []string) int {
var dev, verifyOnly bool
var dev, verifyOnly, devHA bool
var configPath []string
var logLevel, devRootTokenID, devListenAddress string
flags := c.Meta.FlagSet("server", meta.FlagSetDefault)
Expand All @@ -63,6 +63,7 @@ func (c *ServerCommand) Run(args []string) int {
flags.StringVar(&devListenAddress, "dev-listen-address", "", "")
flags.StringVar(&logLevel, "log-level", "info", "")
flags.BoolVar(&verifyOnly, "verify-only", false, "")
flags.BoolVar(&devHA, "dev-ha", false, "")
flags.Usage = func() { c.Ui.Error(c.Help()) }
flags.Var((*sliceflag.StringFlag)(&configPath), "config", "config")
if err := flags.Parse(args); err != nil {
Expand Down Expand Up @@ -98,7 +99,7 @@ func (c *ServerCommand) Run(args []string) int {
// Load the configuration
var config *server.Config
if dev {
config = server.DevConfig()
config = server.DevConfig(devHA)
if devListenAddress != "" {
config.Listeners[0].Config["address"] = devListenAddress
}
Expand Down Expand Up @@ -179,7 +180,7 @@ func (c *ServerCommand) Run(args []string) int {

coreConfig := &vault.CoreConfig{
Physical: backend,
AdvertiseAddr: config.Backend.AdvertiseAddr,
RedirectAddr: config.Backend.RedirectAddr,
HAPhysical: nil,
Seal: seal,
AuditBackends: c.AuditBackends,
Expand All @@ -193,6 +194,8 @@ func (c *ServerCommand) Run(args []string) int {
ClusterName: config.ClusterName,
}

var disableClustering bool

// Initialize the separate HA physical backend, if it exists
var ok bool
if config.HABackend != nil {
Expand All @@ -215,75 +218,92 @@ func (c *ServerCommand) Run(args []string) int {
return 1
}

coreConfig.AdvertiseAddr = config.HABackend.AdvertiseAddr
coreConfig.RedirectAddr = config.HABackend.RedirectAddr
disableClustering = config.HABackend.DisableClustering
if !disableClustering {
coreConfig.ClusterAddr = config.HABackend.ClusterAddr
}
} else {
if coreConfig.HAPhysical, ok = backend.(physical.HABackend); ok {
coreConfig.AdvertiseAddr = config.Backend.AdvertiseAddr
coreConfig.RedirectAddr = config.Backend.RedirectAddr
disableClustering = config.Backend.DisableClustering
if !disableClustering {
coreConfig.ClusterAddr = config.Backend.ClusterAddr
}
}
}

if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" {
coreConfig.AdvertiseAddr = envAA
if envRA := os.Getenv("VAULT_REDIRECT_ADDR"); envRA != "" {
coreConfig.RedirectAddr = envRA
} else if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" {
coreConfig.RedirectAddr = envAA
}

// Attempt to detect the advertise address, if possible
var detect physical.AdvertiseDetect
// Attempt to detect the redirect address, if possible
var detect physical.RedirectDetect
if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() {
detect, ok = coreConfig.HAPhysical.(physical.AdvertiseDetect)
detect, ok = coreConfig.HAPhysical.(physical.RedirectDetect)
} else {
detect, ok = coreConfig.Physical.(physical.AdvertiseDetect)
detect, ok = coreConfig.Physical.(physical.RedirectDetect)
}
if ok && coreConfig.AdvertiseAddr == "" {
advertise, err := c.detectAdvertise(detect, config)
if ok && coreConfig.RedirectAddr == "" {
redirect, err := c.detectRedirect(detect, config)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error detecting advertise address: %s", err))
} else if advertise == "" {
c.Ui.Error("Failed to detect advertise address.")
c.Ui.Error(fmt.Sprintf("Error detecting redirect address: %s", err))
} else if redirect == "" {
c.Ui.Error("Failed to detect redirect address.")
} else {
coreConfig.AdvertiseAddr = advertise
coreConfig.RedirectAddr = redirect
}
}

// Initialize the core
core, newCoreError := vault.NewCore(coreConfig)
if newCoreError != nil {
if !errwrap.ContainsType(newCoreError, new(vault.NonFatalError)) {
c.Ui.Error(fmt.Sprintf("Error initializing core: %s", newCoreError))
// After the redirect bits are sorted out, if no cluster address was
// explicitly given, derive one from the redirect addr
if disableClustering {
coreConfig.ClusterAddr = ""
} else if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
coreConfig.ClusterAddr = envCA
} else if coreConfig.ClusterAddr == "" && coreConfig.RedirectAddr != "" {
u, err := url.ParseRequestURI(coreConfig.RedirectAddr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing redirect address %s: %v", coreConfig.RedirectAddr, err))
return 1
}
host, port, err := net.SplitHostPort(u.Host)
nPort, nPortErr := strconv.Atoi(port)
if err != nil {
// assume it's due to there not being a port specified, in which case
// use 443
host = u.Host
nPort = 443
}
if nPortErr != nil {
c.Ui.Error(fmt.Sprintf("Cannot parse %s as a numeric port: %v", port, nPortErr))
return 1
}
u.Host = net.JoinHostPort(host, strconv.Itoa(nPort+1))
// Will always be TLS-secured
u.Scheme = "https"
coreConfig.ClusterAddr = u.String()
}

// If we're in dev mode, then initialize the core
if dev {
init, err := c.enableDev(core, devRootTokenID)
if coreConfig.ClusterAddr != "" {
// Force https as we'll always be TLS-secured
u, err := url.ParseRequestURI(coreConfig.ClusterAddr)
if err != nil {
c.Ui.Error(fmt.Sprintf(
"Error initializing dev mode: %s", err))
c.Ui.Error(fmt.Sprintf("Error parsing cluster address %s: %v", coreConfig.RedirectAddr, err))
return 1
}
u.Scheme = "https"
coreConfig.ClusterAddr = u.String()
}

export := "export"
quote := "'"
if runtime.GOOS == "windows" {
export = "set"
quote = ""
// Initialize the core
core, newCoreError := vault.NewCore(coreConfig)
if newCoreError != nil {
if !errwrap.ContainsType(newCoreError, new(vault.NonFatalError)) {
c.Ui.Error(fmt.Sprintf("Error initializing core: %s", newCoreError))
return 1
}

c.Ui.Output(fmt.Sprintf(
"==> WARNING: Dev mode is enabled!\n\n"+
"In this mode, Vault is completely in-memory and unsealed.\n"+
"Vault is configured to only have a single unseal key. The root\n"+
"token has already been authenticated with the CLI, so you can\n"+
"immediately begin using the Vault CLI.\n\n"+
"The only step you need to take is to set the following\n"+
"environment variables:\n\n"+
" "+export+" VAULT_ADDR="+quote+"http://"+config.Listeners[0].Config["address"]+quote+"\n\n"+
"The unseal key and root token are reproduced below in case you\n"+
"want to seal/unseal the Vault or play with authentication.\n\n"+
"Unseal Key: %s\nRoot Token: %s\n",
hex.EncodeToString(init.SecretShares[0]),
init.RootToken,
))
}

// Compile server information for output later
Expand All @@ -296,21 +316,31 @@ func (c *ServerCommand) Run(args []string) int {

if config.HABackend != nil {
info["HA backend"] = config.HABackend.Type
info["advertise address"] = coreConfig.AdvertiseAddr
infoKeys = append(infoKeys, "HA backend", "advertise address")
info["redirect address"] = coreConfig.RedirectAddr
infoKeys = append(infoKeys, "HA backend", "redirect address")
if coreConfig.ClusterAddr != "" {
info["cluster address"] = coreConfig.ClusterAddr
infoKeys = append(infoKeys, "cluster address")
}
} else {
// If the backend supports HA, then note it
if coreConfig.HAPhysical != nil {
if coreConfig.HAPhysical.HAEnabled() {
info["backend"] += " (HA available)"
info["advertise address"] = coreConfig.AdvertiseAddr
infoKeys = append(infoKeys, "advertise address")
info["redirect address"] = coreConfig.RedirectAddr
infoKeys = append(infoKeys, "redirect address")
if coreConfig.ClusterAddr != "" {
info["cluster address"] = coreConfig.ClusterAddr
infoKeys = append(infoKeys, "cluster address")
}
} else {
info["backend"] += " (HA disabled)"
}
}
}

clusterAddrs := []string{}

// Initialize the listeners
lns := make([]net.Listener, 0, len(config.Listeners))
for i, lnConfig := range config.Listeners {
Expand All @@ -322,6 +352,35 @@ func (c *ServerCommand) Run(args []string) int {
return 1
}

lns = append(lns, ln)

if reloadFunc != nil {
relSlice := c.ReloadFuncs["listener|"+lnConfig.Type]
relSlice = append(relSlice, reloadFunc)
c.ReloadFuncs["listener|"+lnConfig.Type] = relSlice
}

if !disableClustering && lnConfig.Type == "tcp" {
var addr string
var ok bool
if addr, ok = lnConfig.Config["cluster_address"]; ok {
clusterAddrs = append(clusterAddrs, addr)
} else {
tcpAddr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
c.Ui.Error("Failed to parse tcp listener")
return 1
}
ipStr := tcpAddr.IP.String()
if len(tcpAddr.IP) == net.IPv6len {
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr = fmt.Sprintf("%s:%d", ipStr, tcpAddr.Port+1)
clusterAddrs = append(clusterAddrs, addr)
}
props["cluster address"] = addr
}

// Store the listener props for output later
key := fmt.Sprintf("listener %d", i+1)
propsList := make([]string, 0, len(props))
Expand All @@ -334,13 +393,9 @@ func (c *ServerCommand) Run(args []string) int {
info[key] = fmt.Sprintf(
"%s (%s)", lnConfig.Type, strings.Join(propsList, ", "))

lns = append(lns, ln)

if reloadFunc != nil {
relSlice := c.ReloadFuncs["listener|"+lnConfig.Type]
relSlice = append(relSlice, reloadFunc)
c.ReloadFuncs["listener|"+lnConfig.Type] = relSlice
}
}
if !disableClustering {
c.logger.Printf("[TRACE] cluster listeners will be started on %v", clusterAddrs)
}

// Make sure we close all listeners from this point on
Expand Down Expand Up @@ -394,16 +449,55 @@ func (c *ServerCommand) Run(args []string) int {
return true
}

if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil {
if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, sealedFunc); err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
}
}
}

handler := vaulthttp.Handler(core)

// This needs to happen before we first unseal, so before we trigger dev
// mode if it's set
core.SetClusterListenerSetupFunc(vault.WrapListenersForClustering(clusterAddrs, handler, c.logger))

// If we're in dev mode, then initialize the core
if dev {
init, err := c.enableDev(core, devRootTokenID)
if err != nil {
c.Ui.Error(fmt.Sprintf(
"Error initializing dev mode: %s", err))
return 1
}

export := "export"
quote := "'"
if runtime.GOOS == "windows" {
export = "set"
quote = ""
}

c.Ui.Output(fmt.Sprintf(
"==> WARNING: Dev mode is enabled!\n\n"+
"In this mode, Vault is completely in-memory and unsealed.\n"+
"Vault is configured to only have a single unseal key. The root\n"+
"token has already been authenticated with the CLI, so you can\n"+
"immediately begin using the Vault CLI.\n\n"+
"The only step you need to take is to set the following\n"+
"environment variables:\n\n"+
" "+export+" VAULT_ADDR="+quote+"http://"+config.Listeners[0].Config["address"]+quote+"\n\n"+
"The unseal key and root token are reproduced below in case you\n"+
"want to seal/unseal the Vault or play with authentication.\n\n"+
"Unseal Key: %s\nRoot Token: %s\n",
hex.EncodeToString(init.SecretShares[0]),
init.RootToken,
))
}

// Initialize the HTTP server
server := &http.Server{}
server.Handler = vaulthttp.Handler(core)
server.Handler = handler
for _, ln := range lns {
go server.Serve(ln)
}
Expand Down Expand Up @@ -466,6 +560,27 @@ func (c *ServerCommand) enableDev(core *vault.Core, rootTokenID string) (*vault.
return nil, fmt.Errorf("failed to unseal Vault for dev mode")
}

isLeader, _, err := core.Leader()
if err != nil && err != vault.ErrHANotEnabled {
return nil, fmt.Errorf("failed to check active status: %v", err)
}
if err == nil {
leaderCount := 5
for !isLeader {
if leaderCount == 0 {
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
return nil, fmt.Errorf("failed to get active status after five seconds; call stack is\n%s\n", buf)
}
time.Sleep(1 * time.Second)
isLeader, _, err = core.Leader()
if err != nil {
return nil, fmt.Errorf("failed to check active status: %v", err)
}
leaderCount--
}
}

if rootTokenID != "" {
req := &logical.Request{
Operation: logical.UpdateOperation,
Expand Down Expand Up @@ -511,8 +626,8 @@ func (c *ServerCommand) enableDev(core *vault.Core, rootTokenID string) (*vault.
return init, nil
}

// detectAdvertise is used to attempt advertise address detection
func (c *ServerCommand) detectAdvertise(detect physical.AdvertiseDetect,
// detectRedirect is used to attempt redirect address detection
func (c *ServerCommand) detectRedirect(detect physical.RedirectDetect,
config *server.Config) (string, error) {
// Get the hostname
host, err := detect.DetectHostAddr()
Expand Down

0 comments on commit 6455400

Please sign in to comment.