From 8a9546c54839eca6fffe65607106a52acedcbfee Mon Sep 17 00:00:00 2001 From: Stephan Renatus Date: Wed, 30 Mar 2022 14:44:26 +0200 Subject: [PATCH] storage/disk: check prefix when adding wildcard partitions The previously done check would have falsely returned that there is no problem when adding a wildcard partition: lookup of "/foo/*" with '*' not interpreted as a wildcard, but as a string, would yield a not-found, even if there was any data under /foo/. Now, we'll check the prefix-until-wildcard. It's more cautious than theoretically necessary, but safe. Signed-off-by: Stephan Renatus --- storage/disk/disk.go | 3 +++ storage/disk/disk_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 93242e6459..2558b56718 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -508,6 +508,9 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi } for _, path := range addedPartitions.Diff(replacements) { + if prefix, wildcard := hasWildcard(path); wildcard { + path = prefix + } for i := len(path); i > 0; i-- { key, err := db.pm.DataPath2Key(path[:i]) if err != nil { diff --git a/storage/disk/disk_test.go b/storage/disk/disk_test.go index cf14381d30..29ce0df7ff 100644 --- a/storage/disk/disk_test.go +++ b/storage/disk/disk_test.go @@ -324,6 +324,31 @@ func TestDataPartitioningValidation(t *testing.T) { } closeFn(ctx, s) + // adding a wildcard partition requires no content on the non-wildcard prefix + // we open the db with previously used partitions, write another key, and + // re-open with an extra wildcard partition + // switching to a partition with multiple wildcards + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/*/*/*"), + storage.MustParsePath("/foo/*"), + }}) + if err != nil { + t.Fatal(err) + } + err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/peanutbutter/jelly"), true) + if err != nil { + t.Fatal(err) + } + closeFn(ctx, s) + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/*/*/*"), + storage.MustParsePath("/peanutbutter/*"), + storage.MustParsePath("/foo/*"), + }}) + if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (existing data: /peanutbutter)") { + t.Fatal("expected to find existing key but got:", err) + } + closeFn(ctx, s) }) }