From 10be6ca1ea3f28195e5290cf97387e822aad9170 Mon Sep 17 00:00:00 2001 From: thomassong Date: Fri, 10 Feb 2023 10:05:00 +0800 Subject: [PATCH] Fix manifest corruption #1756 --- manifest.go | 15 ++++++++------- manifest_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/manifest.go b/manifest.go index 5a2e837e3..8fe44d63d 100644 --- a/manifest.go +++ b/manifest.go @@ -27,10 +27,11 @@ import ( "path/filepath" "sync" - "github.com/dgraph-io/badger/pb" - "github.com/dgraph-io/badger/y" "github.com/golang/protobuf/proto" "github.com/pkg/errors" + + "github.com/dgraph-io/badger/pb" + "github.com/dgraph-io/badger/y" ) // Manifest represents the contents of the MANIFEST file in a Badger store. @@ -194,15 +195,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() + defer mf.appendLock.Unlock() + if err := applyChangeSet(&mf.manifest, &changes); err != nil { - mf.appendLock.Unlock() return err } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { if err := mf.rewrite(); err != nil { - mf.appendLock.Unlock() return err } } else { @@ -211,15 +212,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable)) buf = append(lenCrcBuf[:], buf...) if _, err := mf.fp.Write(buf); err != nil { - mf.appendLock.Unlock() return err } } - mf.appendLock.Unlock() - return y.FileSync(mf.fp) + return syncFunc(mf.fp) } +var syncFunc = func(f *os.File) error { return y.FileSync(f) } + // Has to be 4 bytes. The value can never change, ever, anyway. var magicText = [4]byte{'B', 'd', 'g', 'r'} diff --git a/manifest_test.go b/manifest_test.go index 9c0dba6ac..db64ec092 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -23,7 +23,9 @@ import ( "os" "path/filepath" "sort" + "sync" "testing" + "time" "golang.org/x/net/trace" @@ -240,3 +242,44 @@ func TestManifestRewrite(t *testing.T) { uint64(deletionsThreshold * 3): {Level: 0, Checksum: []byte{}}, }, m.Tables) } + +func TestConcurrentManifestCompaction(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + // set this low so rewrites will happen more often + deletionsThreshold := 1 + + // overwrite the sync function to make this race condition easily reproducible + syncFunc = func(f *os.File) error { + // effectively making the Sync() take around 1s makes this reproduce every time + time.Sleep(1 * time.Second) + return f.Sync() + } + + mf, _, err := helpOpenOrCreateManifestFile(dir, false, deletionsThreshold) + require.NoError(t, err) + + cs := &pb.ManifestChangeSet{} + for i := uint64(0); i < 1000; i++ { + cs.Changes = append(cs.Changes, + newCreateChange(i, 0, nil), + newDeleteChange(i), + ) + } + + // simulate 2 concurrent compaction threads + n := 2 + wg := sync.WaitGroup{} + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + require.NoError(t, mf.addChanges(cs.Changes)) + }() + } + wg.Wait() + + require.NoError(t, mf.close()) +}