Skip to content

Commit

Permalink
bundle/store: deal with erasing large bundles
Browse files Browse the repository at this point in the history
Before, we've accumulated too-large transactions by retrieving data keys
according to their partitioning, and deleting them one-by-one.

Now, we'll do the same thing, but in an iterator passed to Truncate, which
deals with restarting transactions.

This also means that we've got an extra call to Truncate, so one Truncate
operation more than before. However, we've been having one Truncate per
bundle already. To fix this, we could combine multiple storage.Iterator into
a chaining iterator, and only do one Truncate operation, but that's a future
optimization.

Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
  • Loading branch information
srenatus committed Jul 12, 2022
1 parent 1af61b5 commit 4577fad
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 33 deletions.
32 changes: 32 additions & 0 deletions bundle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,35 @@ func sortFilePathAscend(files []file) {
return len(files[i].path) < len(files[j].path)
})
}

type eraser struct {
roots []storage.Path
idx int
}

// eraseIterator is used to use storage.Truncate for resetting bundle roots in data
func eraseIterator(roots map[string]struct{}) (storage.Iterator, error) {
it := eraser{
roots: make([]storage.Path, 0, len(roots)),
}
for r := range roots {
root, ok := storage.ParsePathEscaped("/" + r)
if !ok {
return nil, fmt.Errorf("manifest root path invalid: %v", r)
}
it.roots = append(it.roots, root)
}
return &it, nil
}

func (e *eraser) Next() (*storage.Update, error) {
if e.idx >= len(e.roots) {
return nil, io.EOF
}
next := storage.Update{
Op: storage.RemoveOp,
Path: e.roots[e.idx],
}
e.idx++
return &next, nil
}
39 changes: 19 additions & 20 deletions bundle/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ type DeactivateOpts struct {
Ctx context.Context
Store storage.Store
Txn storage.Transaction
TxnCtx *storage.Context
BundleNames map[string]struct{}
}

Expand All @@ -330,7 +331,7 @@ func Deactivate(opts *DeactivateOpts) error {
erase[root] = struct{}{}
}
}
_, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.BundleNames, erase)
_, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.TxnCtx, opts.BundleNames, erase)
return err
}

Expand All @@ -343,9 +344,11 @@ func activateBundles(opts *ActivateOpts) error {
snapshotBundles := map[string]*Bundle{}

for name, b := range opts.Bundles {
if b.Type() == DeltaBundleType {
switch b.Type() {
case DeltaBundleType:
deltaBundles[name] = b
} else {

default:
snapshotBundles[name] = b
names[name] = struct{}{}

Expand Down Expand Up @@ -380,7 +383,7 @@ func activateBundles(opts *ActivateOpts) error {

// Erase data and policies at new + old roots, and remove the old
// manifests before activating a new snapshot bundle.
remaining, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, names, erase)
remaining, err := eraseBundles(opts.Ctx, opts.Store, opts.Txn, opts.TxnCtx, names, erase)
if err != nil {
return err
}
Expand Down Expand Up @@ -587,9 +590,9 @@ func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error

// erase bundles by name and roots. This will clear all policies and data at its roots and remove its
// manifest from storage.
func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transaction, names map[string]struct{}, roots map[string]struct{}) (map[string]*ast.Module, error) {
func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transaction, txnCtx *storage.Context, names map[string]struct{}, roots map[string]struct{}) (map[string]*ast.Module, error) {

if err := eraseData(ctx, store, txn, roots); err != nil {
if err := eraseData(ctx, store, txn, txnCtx, roots); err != nil {
return nil, err
}

Expand All @@ -615,22 +618,18 @@ func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transact
return nil, err
}
}

return remaining, nil
}

func eraseData(ctx context.Context, store storage.Store, txn storage.Transaction, roots map[string]struct{}) error {
for root := range roots {
path, ok := storage.ParsePathEscaped("/" + root)
if !ok {
return fmt.Errorf("manifest root path invalid: %v", root)
}

if len(path) > 0 {
if err := store.Write(ctx, txn, storage.RemoveOp, path, nil); suppressNotFound(err) != nil {
return err
}
}
func eraseData(ctx context.Context, store storage.Store, txn storage.Transaction, txnCtx *storage.Context, roots map[string]struct{}) error {
eraser, err := eraseIterator(roots)
if err != nil {
return err
}
params := storage.WriteParams
params.Context = txnCtx
if err := store.Truncate(ctx, txn, params, eraser); err != nil {
return fmt.Errorf("store truncate failed for reset of roots '%v': %v", roots, err)
}
return nil
}
Expand Down Expand Up @@ -762,7 +761,7 @@ func compileModules(compiler *ast.Compiler, m metrics.Metrics, bundles map[strin
m.Timer(metrics.RegoModuleCompile).Start()
defer m.Timer(metrics.RegoModuleCompile).Stop()

modules := map[string]*ast.Module{}
modules := make(map[string]*ast.Module, len(compiler.Modules)+len(extraModules)+len(bundles))

// preserve any modules already on the compiler
for name, module := range compiler.Modules {
Expand Down
2 changes: 1 addition & 1 deletion bundle/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ func TestEraseData(t *testing.T) {
roots[root] = struct{}{}
}

err := eraseData(ctx, mockStore, txn, roots)
err := eraseData(ctx, mockStore, txn, nil, roots)
if !tc.expectErr && err != nil {
t.Fatalf("unepected error: %s", err)
} else if tc.expectErr && err == nil {
Expand Down
1 change: 1 addition & 0 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
Ctx: ctx,
Store: p.manager.Store,
Txn: txn,
TxnCtx: params.Context,
BundleNames: deletedBundles,
}
err := bundle.Deactivate(opts)
Expand Down
24 changes: 16 additions & 8 deletions storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(err)
}

if update.IsPolicy {
switch {
case update.IsPolicy:
err = underlyingTxn.UpsertPolicy(ctx, update.Path.String(), update.Value)
if err != nil {
if err != badger.ErrTxnTooBig {
Expand All @@ -295,9 +296,10 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(err)
}
}
} else {

default:
if len(update.Path) > 0 {
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, update.Path, update.Value)
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, update.Op, update.Path, update.Value)
if err != nil {
return wrapError(err)
}
Expand All @@ -320,7 +322,7 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return fmt.Errorf("storage path invalid: %v", newPath)
}

sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, newPath, obj[k])
sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, storage.AddOp, newPath, obj[k])
if err != nil {
return wrapError(err)
}
Expand Down Expand Up @@ -388,10 +390,16 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s
return wrapError(db.cleanup(oldDb, backupDir))
}

func (db *Store) doTruncateData(ctx context.Context, underlying *transaction, badgerdb *badger.DB,
params storage.TransactionParams, path storage.Path, value interface{}) (*transaction, error) {
func (db *Store) doTruncateData(
ctx context.Context,
underlying *transaction,
badgerdb *badger.DB,
params storage.TransactionParams,
op storage.PatchOp,
path storage.Path,
value interface{}) (*transaction, error) {

err := underlying.Write(ctx, storage.AddOp, path, value)
err := underlying.Write(ctx, op, path, value)
if err != nil {
if err != badger.ErrTxnTooBig {
return nil, wrapError(err)
Expand All @@ -406,7 +414,7 @@ func (db *Store) doTruncateData(ctx context.Context, underlying *transaction, ba
xid := atomic.AddUint64(&db.xid, uint64(1))
sTxn := newTransaction(xid, true, txn, params.Context, db.pm, db.partitions, nil)

if err = sTxn.Write(ctx, storage.AddOp, path, value); err != nil {
if err = sTxn.Write(ctx, op, path, value); err != nil {
return nil, wrapError(err)
}

Expand Down
21 changes: 17 additions & 4 deletions storage/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,26 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
break
}

if update.IsPolicy {
switch {
case update.IsPolicy:
err = underlying.UpsertPolicy(update.Path.String(), update.Value)
if err != nil {
return err
}
} else {

case update.Op == storage.RemoveOp:
_, err := underlying.Read(update.Path)
if err != nil {
if !storage.IsNotFound(err) {
return err
}
} else {
if err := underlying.Write(storage.RemoveOp, update.Path, nil); err != nil {
return err
}
}

case update.Op == storage.AddOp:
if len(update.Path) > 0 {
var obj interface{}
err = util.Unmarshal(update.Value, &obj)
Expand All @@ -132,8 +146,7 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
}
}

err = underlying.Write(storage.AddOp, update.Path, obj)
if err != nil {
if err := underlying.Write(storage.AddOp, update.Path, obj); err != nil {
return err
}
} else {
Expand Down
1 change: 1 addition & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ type Iterator interface {

// Update contains information about a file
type Update struct {
Op PatchOp
Path Path
Value []byte
IsPolicy bool
Expand Down

0 comments on commit 4577fad

Please sign in to comment.