Skip to content

Commit

Permalink
plugins/bundle: Support for saving and reading bundles from disk
Browse files Browse the repository at this point in the history
This commit adds support to persist and load bundles from disk.
A new field is introduced in OPA's bundle configuration that can
be optionally set to enable OPA to write and read bundles from disk.
This feature will allow OPA to serve policy decisions in scenarios
such as OPA being unable to communicate with the bundle server.

Fixes #2097

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Sep 30, 2020
1 parent b535cbf commit 5a79a45
Show file tree
Hide file tree
Showing 5 changed files with 583 additions and 26 deletions.
2 changes: 2 additions & 0 deletions docs/content/configuration.md
Expand Up @@ -39,6 +39,7 @@ bundles:
authz:
service: acmecorp
resource: bundles/http/example/authz.tar.gz
persist: true
polling:
min_delay_seconds: 60
max_delay_seconds: 120
Expand Down Expand Up @@ -388,6 +389,7 @@ included in the actual bundle gzipped tarball.
| `bundles[_].service` | `string` | Yes | Name of service to use to contact remote server. |
| `bundles[_].polling.min_delay_seconds` | `int64` | No (default: `60`) | Minimum amount of time to wait between bundle downloads. |
| `bundles[_].polling.max_delay_seconds` | `int64` | No (default: `120`) | Maximum amount of time to wait between bundle downloads. |
| `bundles[_].persist` | `bool` | No | Persist activated bundles to disk. |
| `bundles[_].signing.keyid` | `string` | No | Name of the key to use for bundle signature verification. |
| `bundles[_].signing.scope` | `string` | No | Scope to use for bundle signature verification. |
| `bundles[_].signing.exclude_files` | `array` | No | Files in the bundle to exclude during verification. |
Expand Down
5 changes: 5 additions & 0 deletions docs/content/management.md
Expand Up @@ -97,6 +97,7 @@ bundles:
authz:
service: acmecorp
resource: somedir/bundle.tar.gz
persist: true
polling:
min_delay_seconds: 10
max_delay_seconds: 20
Expand Down Expand Up @@ -125,6 +126,10 @@ be useful when relying on default `resource` behavior with a name like
`authz/bundle.tar.gz` which results in a `resource` of
`bundles/authz/bundle.tar.gz`.

OPA can optionally save and read bundles from disk based on the value of the `bundles[_].persist`
field. If this field is set, OPA will persist activated bundles to disk and load bundles from disk too in scenarios such as
OPA being unable to communicate with the bundle server.

The optional `bundles[_].signing` field can be used to specify the `keyid` and `scope` that should be used
for verifying the signature of the bundle. See [this](#bundle-signature) section for details.

Expand Down
2 changes: 2 additions & 0 deletions plugins/bundle/config.go
Expand Up @@ -42,6 +42,7 @@ func ParseConfig(config []byte, services []string) (*Config, error) {
Service: parsedConfig.Service,
Resource: parsedConfig.generateLegacyResourcePath(),
Signing: nil,
Persist: false,
},
}

Expand Down Expand Up @@ -135,6 +136,7 @@ type Source struct {
Service string `json:"service"`
Resource string `json:"resource"`
Signing *bundle.VerificationConfig `json:"signing"`
Persist bool `json:"persist"`
}

// IsMultiBundle returns whether or not the config is the newer multi-bundle
Expand Down
208 changes: 182 additions & 26 deletions plugins/bundle/plugin.go
Expand Up @@ -6,9 +6,13 @@
package bundle

import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sync"
"time"
Expand All @@ -27,17 +31,18 @@ import (

// Plugin implements bundle activation.
type Plugin struct {
config Config
manager *plugins.Manager // plugin manager for storage and service clients
status map[string]*Status // current status for each bundle
etags map[string]string // etag on last successful activation
listeners map[interface{}]func(Status) // listeners to send status updates to
bulkListeners map[interface{}]func(map[string]*Status) // listeners to send aggregated status updates to
downloaders map[string]*download.Downloader
mtx sync.Mutex
cfgMtx sync.Mutex
legacyConfig bool
ready bool
config Config
manager *plugins.Manager // plugin manager for storage and service clients
status map[string]*Status // current status for each bundle
etags map[string]string // etag on last successful activation
listeners map[interface{}]func(Status) // listeners to send status updates to
bulkListeners map[interface{}]func(map[string]*Status) // listeners to send aggregated status updates to
downloaders map[string]*download.Downloader
mtx sync.Mutex
cfgMtx sync.Mutex
legacyConfig bool
ready bool
bundlePersistPath string
}

// New returns a new Plugin with the given config.
Expand Down Expand Up @@ -79,6 +84,19 @@ func Lookup(manager *plugins.Manager) *Plugin {
func (p *Plugin) Start(ctx context.Context) error {
p.mtx.Lock()
defer p.mtx.Unlock()

var err error

p.bundlePersistPath, err = getDefaultBundlePersistPath()
if err != nil {
return err
}

err = p.loadAndActivateBundlesFromDisk(ctx)
if err != nil {
return err
}

p.initDownloaders()
for name, dl := range p.downloaders {
p.logInfo(name, "Starting bundle downloader.")
Expand Down Expand Up @@ -245,6 +263,38 @@ func (p *Plugin) initDownloaders() {
}
}

func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) error {
for name := range p.config.Bundles {
if p.persistBundle(name) {
b, err := loadBundleFromDisk(p.bundlePersistPath, name)
if err != nil {
p.logError(name, "Failed to load bundle from disk: %v", err)
return err
}

if b == nil {
return nil
}

p.status[name].Metrics = metrics.New()

err = p.activate(ctx, name, b)
if err != nil {
p.logError(name, "Bundle activation failed: %v", err)
return err
}

p.status[name].SetError(nil)
p.status[name].SetActivateSuccess(b.Manifest.Revision)

p.checkPluginReadiness()

p.logDebug(name, "Bundle loaded from disk and activated successfully.")
}
}
return nil
}

func (p *Plugin) newDownloader(name string, source *Source) *download.Downloader {
conf := source.Config
client := p.manager.Client(source.Service)
Expand Down Expand Up @@ -311,8 +361,22 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
return
}

if p.persistBundle(name) {
p.logDebug(name, "Persisting bundle to disk in progress.")

err := p.saveBundleToDisk(name, u.Bundle)
if err != nil {
p.logError(name, "Persisting bundle to disk failed: %v", err)
p.status[name].SetError(err)
p.downloaders[name].ClearCache()
return
}
p.logDebug(name, "Bundle persisted to disk successfully at path %v.", filepath.Join(p.bundlePersistPath, name))
}

p.status[name].SetError(nil)
p.status[name].SetActivateSuccess(u.Bundle.Manifest.Revision)

if u.ETag != "" {
p.logInfo(name, "Bundle downloaded and activated successfully. Etag updated to %v.", u.ETag)
} else {
Expand All @@ -321,21 +385,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
p.etags[name] = u.ETag

// If the plugin wasn't ready yet then check if we are now after activating this bundle.
if !p.ready {
readyNow := true // optimistically
for _, status := range p.status {
if len(status.Errors) > 0 || (status.LastSuccessfulActivation == time.Time{}) {
readyNow = false // Not ready yet, check again on next bundle activation.
break
}
}

if readyNow {
p.ready = true
p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
}

}
p.checkPluginReadiness()
return
}

Expand All @@ -346,6 +396,23 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
}
}

func (p *Plugin) checkPluginReadiness() {
if !p.ready {
readyNow := true // optimistically
for _, status := range p.status {
if len(status.Errors) > 0 || (status.LastSuccessfulActivation == time.Time{}) {
readyNow = false // Not ready yet, check again on next bundle activation.
break
}
}

if readyNow {
p.ready = true
p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
}
}
}

func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) error {
p.logDebug(name, "Bundle activation in progress. Opening storage transaction.")

Expand Down Expand Up @@ -385,6 +452,15 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er
return err
}

func (p *Plugin) persistBundle(name string) bool {
bundleSrc := p.config.Bundles[name]

if bundleSrc == nil {
return false
}
return bundleSrc.Persist
}

func (p *Plugin) logError(bundleName string, fmt string, a ...interface{}) {
logrus.WithFields(p.logrusFields(bundleName)).Errorf(fmt, a...)
}
Expand All @@ -397,6 +473,10 @@ func (p *Plugin) logDebug(bundleName string, fmt string, a ...interface{}) {
logrus.WithFields(p.logrusFields(bundleName)).Debugf(fmt, a...)
}

func (p *Plugin) logWarn(bundleName string, fmt string, a ...interface{}) {
logrus.WithFields(p.logrusFields(bundleName)).Warnf(fmt, a...)
}

func (p *Plugin) logrusFields(bundleName string) logrus.Fields {

f := logrus.Fields{
Expand Down Expand Up @@ -429,3 +509,79 @@ func (p *Plugin) configDelta(newConfig *Config) (map[string]*Source, map[string]

return newBundles, updatedBundles, deletedBundles
}

func (p *Plugin) saveBundleToDisk(name string, b *bundle.Bundle) error {

bundleDir := filepath.Join(p.bundlePersistPath, name)
tmpFile := filepath.Join(bundleDir, ".bundle.tar.gz.tmp")
bundleFile := filepath.Join(bundleDir, "bundle.tar.gz")

saveErr := saveCurrentBundleToDisk(bundleDir, ".bundle.tar.gz.tmp", b)
if saveErr != nil {
p.logWarn(name, "Failed to save new bundle to disk: %v", saveErr)

if err := os.Remove(tmpFile); err != nil {
p.logWarn(name, "Failed to remove temp file ('%s'): %v", tmpFile, err)
}

if _, err := os.Stat(bundleFile); err == nil {
p.logWarn(name, "Older version of activated bundle persisted, ignoring error")
return nil
}
return saveErr
}

return os.Rename(tmpFile, bundleFile)
}

func saveCurrentBundleToDisk(path, filename string, b *bundle.Bundle) error {
var buf bytes.Buffer

if err := bundle.NewWriter(&buf).UseModulePath(true).Write(*b); err != nil {
return err
}

if _, err := os.Stat(path); os.IsNotExist(err) {
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return err
}
}

if err := ioutil.WriteFile(filepath.Join(path, filename), buf.Bytes(), 0644); err != nil {
return err
}

return nil
}

func loadBundleFromDisk(path, name string) (*bundle.Bundle, error) {
bundlePath := filepath.Join(path, name, "bundle.tar.gz")

if _, err := os.Stat(bundlePath); err == nil {
f, err := os.Open(filepath.Join(bundlePath))
if err != nil {
return nil, err
}
defer f.Close()

b, err := bundle.NewReader(f).Read()
if err != nil {
return nil, err
}
return &b, nil

} else if os.IsNotExist(err) {
return nil, nil
} else {
return nil, err
}
}

func getDefaultBundlePersistPath() (string, error) {
pwd, err := os.Getwd()
if err != nil {
return "", err
}
return filepath.Join(pwd, ".opa"), nil
}

0 comments on commit 5a79a45

Please sign in to comment.