Skip to content

Commit

Permalink
storage/inmem: Avoid unnecessary read operation
Browse files Browse the repository at this point in the history
While writing data to the in-memory store via truncate op,
OPA reads data at the current path before adding
new data to the store to ensure the path exists.

A bundle that contains data files at non-root locations will
trigger a read on the store for each file and hence for a large
bundle this can cause an increase in the bundle activation time
and also resource usage.

This change attempts to avoid multiple read ops by merging all the data
in the bundle and performing a single write on the store.

This fix was tested by observing the bundle activation time and cpu usage
during bundle activation. The test bundle consisted of multiple data files at
non-root locations. The bundle structure was something like:
a/b/data.json, a/c/data.json etc.

Improvements were seen in both cpu usage and activation time as compared
to the older approach of doing a read while writing each file. This can be
attributed to not reading all data under "a" in the test bundle for
every write which was the case earlier.

Fixes: open-policy-agent#4898

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Jul 21, 2022
1 parent 5204239 commit 89e22df
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 39 deletions.
99 changes: 60 additions & 39 deletions storage/inmem/inmem.go
Expand Up @@ -19,9 +19,13 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"

"github.com/open-policy-agent/opa/internal/merge"

"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/util"
)
Expand Down Expand Up @@ -96,6 +100,7 @@ func (db *store) NewTransaction(_ context.Context, params ...storage.Transaction
func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storage.TransactionParams, it storage.Iterator) error {
var update *storage.Update
var err error
var mergedData map[string]interface{}

underlying, err := db.underlying(txn)
if err != nil {
Expand All @@ -114,52 +119,29 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
return err
}
} else {
if len(update.Path) > 0 {
var obj interface{}
err = util.Unmarshal(update.Value, &obj)
if err != nil {
return err
}

_, err := underlying.Read(update.Path[:len(update.Path)-1])
if err != nil {
if !storage.IsNotFound(err) {
return err
}
var value interface{}
err = util.Unmarshal(update.Value, &value)
if err != nil {
return err
}

if err := storage.MakeDir(ctx, db, txn, update.Path[:len(update.Path)-1]); err != nil {
return err
}
}
var key []string
dirpath := strings.TrimLeft(update.Path.String(), "/")
if len(dirpath) > 0 {
key = strings.Split(dirpath, "/")
}

err = underlying.Write(storage.AddOp, update.Path, obj)
if value != nil {
obj, err := mktree(key, value)
if err != nil {
return err
}
} else {
// write operation at root path

var val map[string]interface{}
err := util.Unmarshal(update.Value, &val)
if err != nil {
return invalidPatchError(rootMustBeObjectMsg)
}

for k := range val {
newPath, ok := storage.ParsePathEscaped("/" + k)
if !ok {
return fmt.Errorf("storage path invalid: %v", newPath)
}

if err := storage.MakeDir(ctx, db, txn, newPath[:len(newPath)-1]); err != nil {
return err
}

err = underlying.Write(storage.AddOp, newPath, val[k])
if err != nil {
return err
}
merged, ok := merge.InterfaceMaps(mergedData, obj)
if !ok {
return fmt.Errorf("failed to insert data file from path %s", filepath.Join(key...))
}
mergedData = merged
}
}
}
Expand All @@ -168,6 +150,24 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag
return err
}

// write merged data to store
for k := range mergedData {
newPath, ok := storage.ParsePathEscaped("/" + k)
if !ok {
return fmt.Errorf("storage path invalid: %v", newPath)
}

if len(newPath) > 0 {
if err := storage.MakeDir(ctx, db, txn, newPath[:len(newPath)-1]); err != nil {
return err
}
}

if err := underlying.Write(storage.AddOp, newPath, mergedData[k]); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -327,3 +327,24 @@ func invalidPatchError(f string, a ...interface{}) *storage.Error {
Message: fmt.Sprintf(f, a...),
}
}

func mktree(path []string, value interface{}) (map[string]interface{}, error) {
if len(path) == 0 {
// For 0 length path the value is the full tree.
obj, ok := value.(map[string]interface{})
if !ok {
return nil, invalidPatchError(rootMustBeObjectMsg)
}
return obj, nil
}

dir := map[string]interface{}{}
for i := len(path) - 1; i > 0; i-- {
dir[path[i]] = value
value = dir
dir = map[string]interface{}{}
}
dir[path[0]] = value

return dir, nil
}
34 changes: 34 additions & 0 deletions storage/inmem/inmem_test.go
Expand Up @@ -476,6 +476,40 @@ func TestTruncate(t *testing.T) {
}
}

func TestTruncateDataMergeError(t *testing.T) {
ctx := context.Background()
store := NewFromObject(map[string]interface{}{})
txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams)

var archiveFiles = map[string]string{
"/a/b/data.json": `{"c": "foo"}`,
"/data.json": `{"a": {"b": {"c": "bar"}}}`,
}

var files [][2]string
for name, content := range archiveFiles {
files = append(files, [2]string{name, content})
}

buf := archive.MustWriteTarGz(files)
b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read()
if err != nil {
t.Fatal(err)
}

iterator := bundle.NewIterator(b.Raw)

err = store.Truncate(ctx, txn, storage.WriteParams, iterator)
if err == nil {
t.Fatal("Expected truncate error but got nil")
}

expected := "failed to insert data file from path a/b"
if err.Error() != expected {
t.Fatalf("Expected error %v but got %v", expected, err.Error())
}
}

func TestTruncateBadRootWrite(t *testing.T) {
ctx := context.Background()
store := NewFromObject(map[string]interface{}{})
Expand Down

0 comments on commit 89e22df

Please sign in to comment.