diff --git a/docs/content/configuration.md b/docs/content/configuration.md index 8e7fa8eb78..5307f669bb 100644 --- a/docs/content/configuration.md +++ b/docs/content/configuration.md @@ -39,6 +39,7 @@ bundles: authz: service: acmecorp resource: bundles/http/example/authz.tar.gz + persist: true polling: min_delay_seconds: 60 max_delay_seconds: 120 @@ -388,6 +389,7 @@ included in the actual bundle gzipped tarball. | `bundles[_].service` | `string` | Yes | Name of service to use to contact remote server. | | `bundles[_].polling.min_delay_seconds` | `int64` | No (default: `60`) | Minimum amount of time to wait between bundle downloads. | | `bundles[_].polling.max_delay_seconds` | `int64` | No (default: `120`) | Maximum amount of time to wait between bundle downloads. | +| `bundles[_].persist` | `bool` | No | Persist activated bundles to disk. | | `bundles[_].signing.keyid` | `string` | No | Name of the key to use for bundle signature verification. | | `bundles[_].signing.scope` | `string` | No | Scope to use for bundle signature verification. | | `bundles[_].signing.exclude_files` | `array` | No | Files in the bundle to exclude during verification. | diff --git a/docs/content/management.md b/docs/content/management.md index 343c0431ea..76d0f2e8cc 100644 --- a/docs/content/management.md +++ b/docs/content/management.md @@ -97,6 +97,7 @@ bundles: authz: service: acmecorp resource: somedir/bundle.tar.gz + persist: true polling: min_delay_seconds: 10 max_delay_seconds: 20 @@ -125,6 +126,10 @@ be useful when relying on default `resource` behavior with a name like `authz/bundle.tar.gz` which results in a `resource` of `bundles/authz/bundle.tar.gz`. +OPA can optionally save and read bundles from disk based on the value of the `bundles[_].persist` +field. If this field is set, OPA will persist activated bundles to disk and load bundles from disk too in scenarios such as +OPA being unable to communicate with the bundle server. + The optional `bundles[_].signing` field can be used to specify the `keyid` and `scope` that should be used for verifying the signature of the bundle. See [this](#bundle-signature) section for details. diff --git a/plugins/bundle/config.go b/plugins/bundle/config.go index 62e36c90d4..2ab454e4e4 100644 --- a/plugins/bundle/config.go +++ b/plugins/bundle/config.go @@ -42,6 +42,7 @@ func ParseConfig(config []byte, services []string) (*Config, error) { Service: parsedConfig.Service, Resource: parsedConfig.generateLegacyResourcePath(), Signing: nil, + Persist: false, }, } @@ -135,6 +136,7 @@ type Source struct { Service string `json:"service"` Resource string `json:"resource"` Signing *bundle.VerificationConfig `json:"signing"` + Persist bool `json:"persist"` } // IsMultiBundle returns whether or not the config is the newer multi-bundle diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index 1f8cea30f1..b8e2181d3b 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -6,9 +6,13 @@ package bundle import ( + "bytes" "context" "errors" "fmt" + "io/ioutil" + "os" + "path/filepath" "reflect" "sync" "time" @@ -27,17 +31,18 @@ import ( // Plugin implements bundle activation. type Plugin struct { - config Config - manager *plugins.Manager // plugin manager for storage and service clients - status map[string]*Status // current status for each bundle - etags map[string]string // etag on last successful activation - listeners map[interface{}]func(Status) // listeners to send status updates to - bulkListeners map[interface{}]func(map[string]*Status) // listeners to send aggregated status updates to - downloaders map[string]*download.Downloader - mtx sync.Mutex - cfgMtx sync.Mutex - legacyConfig bool - ready bool + config Config + manager *plugins.Manager // plugin manager for storage and service clients + status map[string]*Status // current status for each bundle + etags map[string]string // etag on last successful activation + listeners map[interface{}]func(Status) // listeners to send status updates to + bulkListeners map[interface{}]func(map[string]*Status) // listeners to send aggregated status updates to + downloaders map[string]*download.Downloader + mtx sync.Mutex + cfgMtx sync.Mutex + legacyConfig bool + ready bool + bundlePersistPath string } // New returns a new Plugin with the given config. @@ -79,6 +84,19 @@ func Lookup(manager *plugins.Manager) *Plugin { func (p *Plugin) Start(ctx context.Context) error { p.mtx.Lock() defer p.mtx.Unlock() + + var err error + + p.bundlePersistPath, err = getDefaultBundlePersistPath() + if err != nil { + return err + } + + err = p.loadAndActivateBundlesFromDisk(ctx) + if err != nil { + return err + } + p.initDownloaders() for name, dl := range p.downloaders { p.logInfo(name, "Starting bundle downloader.") @@ -245,6 +263,38 @@ func (p *Plugin) initDownloaders() { } } +func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) error { + for name := range p.config.Bundles { + if p.persistBundle(name) { + b, err := loadBundleFromDisk(p.bundlePersistPath, name) + if err != nil { + p.logError(name, "Failed to load bundle from disk: %v", err) + return err + } + + if b == nil { + return nil + } + + p.status[name].Metrics = metrics.New() + + err = p.activate(ctx, name, b) + if err != nil { + p.logError(name, "Bundle activation failed: %v", err) + return err + } + + p.status[name].SetError(nil) + p.status[name].SetActivateSuccess(b.Manifest.Revision) + + p.checkPluginReadiness() + + p.logDebug(name, "Bundle loaded from disk and activated successfully.") + } + } + return nil +} + func (p *Plugin) newDownloader(name string, source *Source) *download.Downloader { conf := source.Config client := p.manager.Client(source.Service) @@ -311,8 +361,22 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { return } + if p.persistBundle(name) { + p.logDebug(name, "Persisting bundle to disk in progress.") + + err := p.saveBundleToDisk(name, u.Bundle) + if err != nil { + p.logError(name, "Persisting bundle to disk failed: %v", err) + p.status[name].SetError(err) + p.downloaders[name].ClearCache() + return + } + p.logDebug(name, "Bundle persisted to disk successfully at path %v.", filepath.Join(p.bundlePersistPath, name)) + } + p.status[name].SetError(nil) p.status[name].SetActivateSuccess(u.Bundle.Manifest.Revision) + if u.ETag != "" { p.logInfo(name, "Bundle downloaded and activated successfully. Etag updated to %v.", u.ETag) } else { @@ -321,21 +385,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { p.etags[name] = u.ETag // If the plugin wasn't ready yet then check if we are now after activating this bundle. - if !p.ready { - readyNow := true // optimistically - for _, status := range p.status { - if len(status.Errors) > 0 || (status.LastSuccessfulActivation == time.Time{}) { - readyNow = false // Not ready yet, check again on next bundle activation. - break - } - } - - if readyNow { - p.ready = true - p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) - } - - } + p.checkPluginReadiness() return } @@ -346,6 +396,23 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { } } +func (p *Plugin) checkPluginReadiness() { + if !p.ready { + readyNow := true // optimistically + for _, status := range p.status { + if len(status.Errors) > 0 || (status.LastSuccessfulActivation == time.Time{}) { + readyNow = false // Not ready yet, check again on next bundle activation. + break + } + } + + if readyNow { + p.ready = true + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) + } + } +} + func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) error { p.logDebug(name, "Bundle activation in progress. Opening storage transaction.") @@ -385,6 +452,15 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er return err } +func (p *Plugin) persistBundle(name string) bool { + bundleSrc := p.config.Bundles[name] + + if bundleSrc == nil { + return false + } + return bundleSrc.Persist +} + func (p *Plugin) logError(bundleName string, fmt string, a ...interface{}) { logrus.WithFields(p.logrusFields(bundleName)).Errorf(fmt, a...) } @@ -397,6 +473,10 @@ func (p *Plugin) logDebug(bundleName string, fmt string, a ...interface{}) { logrus.WithFields(p.logrusFields(bundleName)).Debugf(fmt, a...) } +func (p *Plugin) logWarn(bundleName string, fmt string, a ...interface{}) { + logrus.WithFields(p.logrusFields(bundleName)).Warnf(fmt, a...) +} + func (p *Plugin) logrusFields(bundleName string) logrus.Fields { f := logrus.Fields{ @@ -429,3 +509,79 @@ func (p *Plugin) configDelta(newConfig *Config) (map[string]*Source, map[string] return newBundles, updatedBundles, deletedBundles } + +func (p *Plugin) saveBundleToDisk(name string, b *bundle.Bundle) error { + + bundleDir := filepath.Join(p.bundlePersistPath, name) + tmpFile := filepath.Join(bundleDir, ".bundle.tar.gz.tmp") + bundleFile := filepath.Join(bundleDir, "bundle.tar.gz") + + saveErr := saveCurrentBundleToDisk(bundleDir, ".bundle.tar.gz.tmp", b) + if saveErr != nil { + p.logWarn(name, "Failed to save new bundle to disk: %v", saveErr) + + if err := os.Remove(tmpFile); err != nil { + p.logWarn(name, "Failed to remove temp file ('%s'): %v", tmpFile, err) + } + + if _, err := os.Stat(bundleFile); err == nil { + p.logWarn(name, "Older version of activated bundle persisted, ignoring error") + return nil + } + return saveErr + } + + return os.Rename(tmpFile, bundleFile) +} + +func saveCurrentBundleToDisk(path, filename string, b *bundle.Bundle) error { + var buf bytes.Buffer + + if err := bundle.NewWriter(&buf).UseModulePath(true).Write(*b); err != nil { + return err + } + + if _, err := os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + return err + } + } + + if err := ioutil.WriteFile(filepath.Join(path, filename), buf.Bytes(), 0644); err != nil { + return err + } + + return nil +} + +func loadBundleFromDisk(path, name string) (*bundle.Bundle, error) { + bundlePath := filepath.Join(path, name, "bundle.tar.gz") + + if _, err := os.Stat(bundlePath); err == nil { + f, err := os.Open(filepath.Join(bundlePath)) + if err != nil { + return nil, err + } + defer f.Close() + + b, err := bundle.NewReader(f).Read() + if err != nil { + return nil, err + } + return &b, nil + + } else if os.IsNotExist(err) { + return nil, nil + } else { + return nil, err + } +} + +func getDefaultBundlePersistPath() (string, error) { + pwd, err := os.Getwd() + if err != nil { + return "", err + } + return filepath.Join(pwd, ".opa"), nil +} diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index da6cb637da..e40c9dacb5 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -9,8 +9,10 @@ import ( "context" "errors" "fmt" + "io/ioutil" "net/http" "net/http/httptest" + "os" "path/filepath" "reflect" "sort" @@ -85,7 +87,206 @@ func TestPluginOneShot(t *testing.T) { } else if !reflect.DeepEqual(data, expData) { t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) } +} + +func TestPluginStart(t *testing.T) { + + ctx := context.Background() + manager := getTestManager() + bundles := map[string]*Source{} + + plugin := New(&Config{Bundles: bundles}, manager) + err := plugin.Start(ctx) + if err != nil { + t.Fatal("unexpected error:", err) + } +} + +func TestPluginOneShotBundlePersistence(t *testing.T) { + + ctx := context.Background() + manager := getTestManager() + + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(dir) + + bundleName := "test-bundle" + bundleSource := Source{ + Persist: true, + } + + bundles := map[string]*Source{} + bundles[bundleName] = &bundleSource + + plugin := New(&Config{Bundles: bundles}, manager) + + plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) + plugin.bundlePersistPath = filepath.Join(dir, ".opa") + + ensurePluginState(t, plugin, plugins.StateNotReady) + + // simulate a bundle download error with no bundle on disk + plugin.oneShot(ctx, bundleName, download.Update{Error: fmt.Errorf("unknown error")}) + + if plugin.status[bundleName].Message == "" { + t.Fatal("expected error but got none") + } + + ensurePluginState(t, plugin, plugins.StateNotReady) + + // download a bundle and persist to disk. Then verify the bundle persisted to disk + module := "package foo\n\ncorge=1" + + b := bundle.Bundle{ + Manifest: bundle.Manifest{Revision: "quickbrownfaux"}, + Data: util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}}`)).(map[string]interface{}), + Modules: []bundle.ModuleFile{ + bundle.ModuleFile{ + URL: "/foo/bar.rego", + Path: "/foo/bar.rego", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + + b.Manifest.Init() + + plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b, Metrics: metrics.New()}) + + ensurePluginState(t, plugin, plugins.StateOK) + + result, err := loadBundleFromDisk(plugin.bundlePersistPath, bundleName) + if err != nil { + t.Fatal("unexpected error:", err) + } + + if !result.Equal(b) { + t.Fatal("expected the downloaded bundle to be equal to the one loaded from disk") + } + + // simulate a bundle download error and verify that the bundle on disk is activated + plugin.oneShot(ctx, bundleName, download.Update{Error: fmt.Errorf("unknown error")}) + + ensurePluginState(t, plugin, plugins.StateOK) + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package foo\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}, "system": {"bundles": {"test-bundle": {"manifest": {"revision": "quickbrownfaux", "roots": [""]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } +} + +func TestLoadAndActivateBundlesFromDisk(t *testing.T) { + + ctx := context.Background() + manager := getTestManager() + + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(dir) + + bundleName := "test-bundle" + bundleSource := Source{ + Persist: true, + } + + bundleNameOther := "test-bundle-other" + bundleSourceOther := Source{} + + bundles := map[string]*Source{} + bundles[bundleName] = &bundleSource + bundles[bundleNameOther] = &bundleSourceOther + + plugin := New(&Config{Bundles: bundles}, manager) + plugin.bundlePersistPath = filepath.Join(dir, ".opa") + + err = plugin.loadAndActivateBundlesFromDisk(ctx) + if err != nil { + t.Fatal("unexpected error:", err) + } + + // persist a bundle to disk and then load it + module := "package foo\n\ncorge=1" + + b := bundle.Bundle{ + Manifest: bundle.Manifest{Revision: "quickbrownfaux"}, + Data: util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}}`)).(map[string]interface{}), + Modules: []bundle.ModuleFile{ + bundle.ModuleFile{ + URL: "/foo/bar.rego", + Path: "/foo/bar.rego", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + + b.Manifest.Init() + + err = plugin.saveBundleToDisk(bundleName, &b) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + err = plugin.loadAndActivateBundlesFromDisk(ctx) + if err != nil { + t.Fatal("unexpected error:", err) + } + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package foo\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}, "system": {"bundles": {"test-bundle": {"manifest": {"revision": "quickbrownfaux", "roots": [""]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } } func TestPluginOneShotCompileError(t *testing.T) { @@ -1384,6 +1585,197 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) { } } +func TestSaveBundleToDiskNew(t *testing.T) { + + manager := getTestManager() + + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(dir) + + bundles := map[string]*Source{} + plugin := New(&Config{Bundles: bundles}, manager) + plugin.bundlePersistPath = filepath.Join(dir, ".opa") + + b := getTestBundle(t) + + err = plugin.saveBundleToDisk("foo", &b) + if err != nil { + t.Fatalf("unexpected error %v", err) + } +} + +func TestSaveBundleToDiskOverWrite(t *testing.T) { + + manager := getTestManager() + + // test to check existing bundle is replaced + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(dir) + + bundles := map[string]*Source{} + plugin := New(&Config{Bundles: bundles}, manager) + plugin.bundlePersistPath = filepath.Join(dir, ".opa") + + bundleName := "foo" + bundleDir := filepath.Join(plugin.bundlePersistPath, bundleName) + + err = os.MkdirAll(bundleDir, os.ModePerm) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + b2 := writeTestBundleToDisk(t, bundleDir) + + module := "package a.a1\n\nbar=1" + + newBundle := bundle.Bundle{ + Manifest: bundle.Manifest{Revision: "quickbrownfaux", Roots: &[]string{"a/a1", "a/a2"}}, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "a2": "foo", + }, + }, + Modules: []bundle.ModuleFile{ + bundle.ModuleFile{ + Path: "bundle/id1", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + newBundle.Manifest.Init() + + err = plugin.saveBundleToDisk("foo", &newBundle) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + actual, err := loadBundleFromDisk(plugin.bundlePersistPath, "foo") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + if actual.Equal(b2) { + t.Fatal("expected existing bundle to be overwritten") + } +} + +func TestSaveCurrentBundleToDisk(t *testing.T) { + b := getTestBundle(t) + + srcDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(srcDir) + + err = saveCurrentBundleToDisk(srcDir, "bundle.tar.gz", &b) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + if _, err := os.Stat(filepath.Join(srcDir, "bundle.tar.gz")); err != nil { + t.Fatalf("unexpected error %v", err) + } +} + +func TestLoadBundleFromDisk(t *testing.T) { + + // no bundle on disk + _, err := loadBundleFromDisk("foo", "bar") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + // create a test bundle and load it from disk + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + defer os.RemoveAll(dir) + + bundleName := "foo" + bundleDir := filepath.Join(dir, bundleName) + + err = os.MkdirAll(bundleDir, os.ModePerm) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + b := writeTestBundleToDisk(t, bundleDir) + + result, err := loadBundleFromDisk(dir, bundleName) + if err != nil { + t.Fatal("unexpected error:", err) + } + + if !result.Equal(b) { + t.Fatal("expected the test bundle to be equal to the one loaded from disk") + } +} + +func TestGetDefaultBundlePersistPath(t *testing.T) { + path, err := getDefaultBundlePersistPath() + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + if !strings.HasSuffix(path, ".opa") { + t.Fatal("expected default persist path to include '.opa' dir") + } +} + +func writeTestBundleToDisk(t *testing.T, srcDir string) bundle.Bundle { + t.Helper() + + b := getTestBundle(t) + + var buf bytes.Buffer + if err := bundle.NewWriter(&buf).UseModulePath(true).Write(b); err != nil { + t.Fatalf("unexpected error %v", err) + } + + if err := ioutil.WriteFile(filepath.Join(srcDir, "bundle.tar.gz"), buf.Bytes(), 0644); err != nil { + t.Fatalf("unexpected error %v", err) + } + + return b +} + +func getTestBundle(t *testing.T) bundle.Bundle { + t.Helper() + + module := "package gork\np[x] { x = 1 }" + + b := bundle.Bundle{ + Manifest: bundle.Manifest{ + Revision: "quickbrownfaux", + }, + Data: map[string]interface{}{}, + Modules: []bundle.ModuleFile{ + { + Path: "/foo.rego", + URL: "/foo.rego", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + + b.Manifest.Init() + return b +} + func validateStoreState(ctx context.Context, t *testing.T, store storage.Store, root string, expData interface{}, expIds []string, expBundleName string, expBundleRev string) { t.Helper() if err := storage.Txn(ctx, store, storage.TransactionParams{}, func(txn storage.Transaction) error {