Skip to content

Commit

Permalink
identity: Add batch entity deletion endpoint (#8785)
Browse files Browse the repository at this point in the history
* identity: Add batch entity deletion endpoint

* Update the parameter description

* Update error message

* Update helper/storagepacker/storagepacker.go

Co-Authored-By: Vishal Nayak <vishalnayak@users.noreply.github.com>

* Review feedback

* Update vault/identity_store_entities.go

Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com>

Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com>
Co-authored-by: Calvin Leung Huang <cleung2010@gmail.com>
  • Loading branch information
3 people committed Apr 30, 2020
1 parent 5a28e41 commit 5928c32
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 93 deletions.
145 changes: 60 additions & 85 deletions helper/storagepacker/storagepacker.go
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -125,112 +127,85 @@ func (s *StoragePacker) BucketKey(itemID string) string {

// DeleteItem removes the item from the respective bucket
func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
return s.DeleteMultipleItems(context.Background(), nil, itemID)
return s.DeleteMultipleItems(context.Background(), nil, []string{itemID})
}

func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs ...string) error {
var err error
switch len(itemIDs) {
case 0:
// Nothing
func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs []string) error {
defer metrics.MeasureSince([]string{"storage_packer", "delete_items"}, time.Now())
if len(itemIDs) == 0 {
return nil

case 1:
logger = hclog.NewNullLogger()
fallthrough

default:
lockIndexes := make(map[string]struct{}, len(s.storageLocks))
for _, itemID := range itemIDs {
bucketKey := s.BucketKey(itemID)
if _, ok := lockIndexes[bucketKey]; !ok {
lockIndexes[bucketKey] = struct{}{}
}
}

lockKeys := make([]string, 0, len(lockIndexes))
for k := range lockIndexes {
lockKeys = append(lockKeys, k)
}

locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
for _, lock := range locks {
lock.Lock()
defer lock.Unlock()
}
}

if logger == nil {
logger = hclog.NewNullLogger()
}

bucketCache := make(map[string]*Bucket, len(s.storageLocks))
// Sort the ids by the bucket they will be deleted from
lockKeys := make([]string, 0)
byBucket := make(map[string]map[string]struct{})
for _, id := range itemIDs {
bucketKey := s.BucketKey(id)
bucket, ok := byBucket[bucketKey]
if !ok {
bucket = make(map[string]struct{})
byBucket[bucketKey] = bucket

logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs))
// Add the lock key once
lockKeys = append(lockKeys, bucketKey)
}

var pctDone int
for idx, itemID := range itemIDs {
bucketKey := s.BucketKey(itemID)
bucket[id] = struct{}{}
}

bucket, bucketFound := bucketCache[bucketKey]
if !bucketFound {
// Read from storage
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
}
if storageEntry == nil {
return nil
}
locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
for _, lock := range locks {
lock.Lock()
defer lock.Unlock()
}

uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}
logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs))

bucket = new(Bucket)
err = proto.Unmarshal(uncompressedData, bucket)
if err != nil {
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
}
// For each bucket, load from storage, remove the necessary items, and add
// write it back out to storage
pctDone := 0
idx := 0
for bucketKey, itemsToRemove := range byBucket {
// Read bucket from storage
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
}

// Look for a matching storage entry
foundIdx := -1
for itemIdx, item := range bucket.Items {
if item.ID == itemID {
foundIdx = itemIdx
break
}
if storageEntry == nil {
logger.Warn("could not find bucket", "bucket", bucketKey)
continue
}

// If there is a match, remove it from the collection and persist the
// resulting collection
if foundIdx != -1 {
bucket.Items[foundIdx] = bucket.Items[len(bucket.Items)-1]
bucket.Items = bucket.Items[:len(bucket.Items)-1]
if !bucketFound {
bucketCache[bucketKey] = bucket
}
uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}

newPctDone := idx * 100.0 / len(itemIDs)
if int(newPctDone) > pctDone {
pctDone = int(newPctDone)
logger.Trace("bucket item removal progress", "percent", pctDone, "items_removed", idx)
bucket := new(Bucket)
err = proto.Unmarshal(uncompressedData, bucket)
if err != nil {
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
}
}

logger.Debug("persisting buckets", "total_buckets", len(bucketCache))
// Look for a matching storage entries and delete them from the list.
for i := 0; i < len(bucket.Items); i++ {
if _, ok := itemsToRemove[bucket.Items[i].ID]; ok {
bucket.Items[i] = bucket.Items[len(bucket.Items)-1]
bucket.Items = bucket.Items[:len(bucket.Items)-1]

// Since we just moved a value to position i we need to
// decrement i so we replay this position
i--
}
}

// Persist all buckets in the cache; these will be the ones that had
// deletions
pctDone = 0
idx := 0
for _, bucket := range bucketCache {
// Fail if the context is canceled, the storage calls will fail anyways
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -241,7 +216,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo
return err
}

newPctDone := idx * 100.0 / len(bucketCache)
newPctDone := idx * 100.0 / len(byBucket)
if int(newPctDone) > pctDone {
pctDone = int(newPctDone)
logger.Trace("bucket persistence progress", "percent", pctDone, "buckets_persisted", idx)
Expand Down
55 changes: 54 additions & 1 deletion helper/storagepacker/storagepacker_test.go
Expand Up @@ -217,7 +217,7 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
}

err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete...)
err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
if err != nil {
t.Fatal(err)
}
Expand All @@ -237,3 +237,56 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
}
}
}

func TestStoragePacker_DeleteMultiple_ALL(t *testing.T) {
storagePacker, err := NewStoragePacker(&logical.InmemStorage{}, log.New(&log.LoggerOptions{Name: "storagepackertest"}), "")
if err != nil {
t.Fatal(err)
}

ctx := context.Background()

// Persist a storage entry
itemsToDelete := make([]string, 0, 10000)
for i := 0; i < 10000; i++ {
item := &Item{
ID: fmt.Sprintf("item%d", i),
}

err = storagePacker.PutItem(ctx, item)
if err != nil {
t.Fatal(err)
}

// Verify that it can be read
fetchedItem, err := storagePacker.GetItem(item.ID)
if err != nil {
t.Fatal(err)
}
if fetchedItem == nil {
t.Fatalf("failed to read the stored item")
}

if item.ID != fetchedItem.ID {
t.Fatalf("bad: item ID; expected: %q\n actual: %q\n", item.ID, fetchedItem.ID)
}

itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
}

err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
if err != nil {
t.Fatal(err)
}

// Check that the deletion was successful
for _, item := range itemsToDelete {
fetchedItem, err := storagePacker.GetItem(item)
if err != nil {
t.Fatal(err)
}
if fetchedItem != nil {
t.Fatal("item not deleted")
}
}
}

0 comments on commit 5928c32

Please sign in to comment.