Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/disk: bundles issue 4868 #4877

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 81 additions & 6 deletions bundle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -16,6 +17,11 @@ import (
"github.com/open-policy-agent/opa/storage"
)

// splitSize is the breaking point for large bundle data values: if the byteslice's
// len exceeds this, we'll attempt to break it into substructure values to avoid
// too-big-txn errors.
const splitSize = 1 << 10

// Descriptor contains information about a file and
// can be used to read the file contents.
type Descriptor struct {
Expand Down Expand Up @@ -283,7 +289,9 @@ func (it *iterator) Next() (*storage.Update, error) {
f := file{name: item.Path}

fpath := strings.TrimLeft(filepath.ToSlash(filepath.Dir(f.name)), "/.")
if strings.HasSuffix(f.name, RegoExt) {
isPolicy := strings.HasSuffix(f.name, RegoExt)

if isPolicy {
fpath = strings.Trim(f.name, "/")
}

Expand All @@ -294,8 +302,12 @@ func (it *iterator) Next() (*storage.Update, error) {
f.path = p

f.raw = item.Value
if isPolicy || isSmallEnough(item.Value) {
it.files = append(it.files, f)
} else {
it.files = append(it.files, splitFile(f)...)
}

it.files = append(it.files, f)
}

sortFilePathAscend(it.files)
Expand All @@ -310,10 +322,7 @@ func (it *iterator) Next() (*storage.Update, error) {
f := it.files[it.idx]
it.idx++

isPolicy := false
if strings.HasSuffix(f.name, RegoExt) {
isPolicy = true
}
isPolicy := strings.HasSuffix(f.name, RegoExt)
srenatus marked this conversation as resolved.
Show resolved Hide resolved

return &storage.Update{
Path: f.path,
Expand All @@ -322,6 +331,40 @@ func (it *iterator) Next() (*storage.Update, error) {
}, nil
}

func isSmallEnough(bs []byte) bool {
return len(bs) < splitSize
}

func splitFile(f file) []file {
obj := make(map[string]json.RawMessage)
if err := json.Unmarshal(f.raw, &obj); err == nil {
ret := make([]file, 0, len(obj))
for k, v := range obj {
ret = append(ret, file{
name: f.name,
path: append(f.path, k),
raw: v,
})
}
return ret
}

arr := make([]json.RawMessage, 0)
if err := json.Unmarshal(f.raw, &arr); err == nil {
ret := make([]file, 0, len(obj))
for i, v := range arr {
ret = append(ret, file{
name: f.name,
path: append(f.path, fmt.Sprint(i)),
raw: v,
})
}
return ret
}

return []file{f} // try it anyways
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the 馃-it-might-still-work-if-we-try code path. We might still fail, but maybe we don't, because the smallSize cutoff value isn't based on any calculation, but a guess.

}

type iterator struct {
raw []Raw
files []file
Expand All @@ -340,3 +383,35 @@ func sortFilePathAscend(files []file) {
return len(files[i].path) < len(files[j].path)
})
}

type eraser struct {
roots []storage.Path
idx int
}

// eraseIterator is used to use storage.Truncate for resetting bundle roots in data
func eraseIterator(roots map[string]struct{}) (storage.Iterator, error) {
it := eraser{
roots: make([]storage.Path, 0, len(roots)),
}
for r := range roots {
root, ok := storage.ParsePathEscaped("/" + r)
if !ok {
return nil, fmt.Errorf("manifest root path invalid: %v", r)
}
it.roots = append(it.roots, root)
}
return &it, nil
}

func (e *eraser) Next() (*storage.Update, error) {
if e.idx >= len(e.roots) {
return nil, io.EOF
}
next := storage.Update{
Op: storage.RemoveOp,
Path: e.roots[e.idx],
}
e.idx++
return &next, nil
}
39 changes: 19 additions & 20 deletions bundle/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ type DeactivateOpts struct {
Ctx context.Context
Store storage.Store
Txn storage.Transaction
TxnCtx *storage.Context
BundleNames map[string]struct{}
}

Expand All @@ -330,7 +331,7 @@ func Deactivate(opts *DeactivateOpts) error {
erase[root] = struct{}{}
}
}
_, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.BundleNames, erase)
_, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.TxnCtx, opts.BundleNames, erase)
return err
}

Expand All @@ -343,9 +344,11 @@ func activateBundles(opts *ActivateOpts) error {
snapshotBundles := map[string]*Bundle{}

for name, b := range opts.Bundles {
if b.Type() == DeltaBundleType {
switch b.Type() {
case DeltaBundleType:
deltaBundles[name] = b
} else {

default:
snapshotBundles[name] = b
names[name] = struct{}{}

Expand Down Expand Up @@ -380,7 +383,7 @@ func activateBundles(opts *ActivateOpts) error {

// Erase data and policies at new + old roots, and remove the old
// manifests before activating a new snapshot bundle.
remaining, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, names, erase)
remaining, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.TxnCtx, names, erase)
if err != nil {
return err
}
Expand Down Expand Up @@ -587,9 +590,9 @@ func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error

// erase bundles by name and roots. This will clear all policies and data at its roots and remove its
// manifest from storage.
func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transaction, names map[string]struct{}, roots map[string]struct{}) (map[string]*ast.Module, error) {
func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transaction, txnCtx *storage.Context, names map[string]struct{}, roots map[string]struct{}) (map[string]*ast.Module, error) {

if err := eraseData(ctx, store, txn, roots); err != nil {
if err := eraseData(ctx, store, txn, txnCtx, roots); err != nil {
return nil, err
}

Expand All @@ -615,22 +618,18 @@ func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transact
return nil, err
}
}

return remaining, nil
}

func eraseData(ctx context.Context, store storage.Store, txn storage.Transaction, roots map[string]struct{}) error {
for root := range roots {
path, ok := storage.ParsePathEscaped("/" + root)
if !ok {
return fmt.Errorf("manifest root path invalid: %v", root)
}

if len(path) > 0 {
if err := store.Write(ctx, txn, storage.RemoveOp, path, nil); suppressNotFound(err) != nil {
return err
}
}
func eraseData(ctx context.Context, store storage.Store, txn storage.Transaction, txnCtx *storage.Context, roots map[string]struct{}) error {
eraser, err := eraseIterator(roots)
if err != nil {
return err
}
params := storage.WriteParams
params.Context = txnCtx
if err := store.Truncate(ctx, txn, params, eraser); err != nil {
return fmt.Errorf("store truncate failed for reset of roots '%v': %v", roots, err)
}
return nil
}
Expand Down Expand Up @@ -762,7 +761,7 @@ func compileModules(compiler *ast.Compiler, m metrics.Metrics, bundles map[strin
m.Timer(metrics.RegoModuleCompile).Start()
defer m.Timer(metrics.RegoModuleCompile).Stop()

modules := map[string]*ast.Module{}
modules := make(map[string]*ast.Module, len(compiler.Modules)+len(extraModules)+len(bundles))

// preserve any modules already on the compiler
for name, module := range compiler.Modules {
Expand Down
2 changes: 1 addition & 1 deletion bundle/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ func TestEraseData(t *testing.T) {
roots[root] = struct{}{}
}

err := eraseData(ctx, mockStore, txn, roots)
err := eraseData(ctx, mockStore, txn, nil, roots)
if !tc.expectErr && err != nil {
t.Fatalf("unepected error: %s", err)
} else if tc.expectErr && err == nil {
Expand Down
1 change: 1 addition & 0 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
Ctx: ctx,
Store: p.manager.Store,
Txn: txn,
TxnCtx: params.Context,
BundleNames: deletedBundles,
}
err := bundle.Deactivate(opts)
Expand Down
28 changes: 16 additions & 12 deletions storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(err)
}

if update.IsPolicy {
switch {
case update.IsPolicy:
err = underlyingTxn.UpsertPolicy(ctx, update.Path.String(), update.Value)
if err != nil {
if err != badger.ErrTxnTooBig {
Expand All @@ -295,9 +296,10 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(err)
}
}
} else {

default:
if len(update.Path) > 0 {
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, update.Path, update.Value)
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, update.Op, update.Path, update.Value)
if err != nil {
return wrapError(err)
}
Expand All @@ -320,11 +322,7 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return fmt.Errorf("storage path invalid: %v", newPath)
}

if err := storage.MakeDir(ctx, db, txn, newPath[:len(newPath)-1]); err != nil {
return err
}
srenatus marked this conversation as resolved.
Show resolved Hide resolved

sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, newPath, obj[k])
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, storage.AddOp, newPath, obj[k])
if err != nil {
return wrapError(err)
}
Expand Down Expand Up @@ -392,10 +390,16 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(db.cleanup(oldDb, backupDir))
}

func (db *Store) doTruncateData(ctx context.Context, underlying *transaction, badgerdb *badger.DB,
params storage.TransactionParams, path storage.Path, value interface{}) (*transaction, error) {
func (db *Store) doTruncateData(
ctx context.Context,
underlying *transaction,
badgerdb *badger.DB,
params storage.TransactionParams,
op storage.PatchOp,
path storage.Path,
value interface{}) (*transaction, error) {

err := underlying.Write(ctx, storage.AddOp, path, value)
err := underlying.Write(ctx, op, path, value)
if err != nil {
if err != badger.ErrTxnTooBig {
return nil, wrapError(err)
Expand All @@ -410,7 +414,7 @@ func (db *Store) doTruncateData(ctx context.Context, underlying *transaction, ba
xid := atomic.AddUint64(&db.xid, uint64(1))
sTxn := newTransaction(xid, true, txn, params.Context, db.pm, db.partitions, nil)

if err = sTxn.Write(ctx, storage.AddOp, path, value); err != nil {
if err = sTxn.Write(ctx, op, path, value); err != nil {
return nil, wrapError(err)
}

Expand Down
21 changes: 17 additions & 4 deletions storage/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,26 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
break
}

if update.IsPolicy {
switch {
case update.IsPolicy:
err = underlying.UpsertPolicy(update.Path.String(), update.Value)
if err != nil {
return err
}
} else {

case update.Op == storage.RemoveOp:
_, err := underlying.Read(update.Path)
if err != nil {
if !storage.IsNotFound(err) {
return err
}
} else {
if err := underlying.Write(storage.RemoveOp, update.Path, nil); err != nil {
return err
}
}

case update.Op == storage.AddOp:
if len(update.Path) > 0 {
var obj interface{}
err = util.Unmarshal(update.Value, &obj)
Expand All @@ -132,8 +146,7 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
}
}

err = underlying.Write(storage.AddOp, update.Path, obj)
if err != nil {
if err := underlying.Write(storage.AddOp, update.Path, obj); err != nil {
return err
}
} else {
Expand Down
1 change: 1 addition & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ type Iterator interface {

// Update contains information about a file
type Update struct {
Op PatchOp
Path Path
Value []byte
IsPolicy bool
Expand Down