Skip to content

Commit

Permalink
Fix manifest corruption dgraph-io#1756
Browse files Browse the repository at this point in the history
  • Loading branch information
mYmNeo committed Feb 13, 2023
1 parent 6a32b8a commit 10be6ca
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
15 changes: 8 additions & 7 deletions manifest.go
Expand Up @@ -27,10 +27,11 @@ import (
"path/filepath"
"sync"

"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/y"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"

"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/y"
)

// Manifest represents the contents of the MANIFEST file in a Badger store.
Expand Down Expand Up @@ -194,15 +195,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {

// Maybe we could use O_APPEND instead (on certain file systems)
mf.appendLock.Lock()
defer mf.appendLock.Unlock()

if err := applyChangeSet(&mf.manifest, &changes); err != nil {
mf.appendLock.Unlock()
return err
}
// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
if err := mf.rewrite(); err != nil {
mf.appendLock.Unlock()
return err
}
} else {
Expand All @@ -211,15 +212,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
buf = append(lenCrcBuf[:], buf...)
if _, err := mf.fp.Write(buf); err != nil {
mf.appendLock.Unlock()
return err
}
}

mf.appendLock.Unlock()
return y.FileSync(mf.fp)
return syncFunc(mf.fp)
}

var syncFunc = func(f *os.File) error { return y.FileSync(f) }

// Has to be 4 bytes. The value can never change, ever, anyway.
var magicText = [4]byte{'B', 'd', 'g', 'r'}

Expand Down
43 changes: 43 additions & 0 deletions manifest_test.go
Expand Up @@ -23,7 +23,9 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"

"golang.org/x/net/trace"

Expand Down Expand Up @@ -240,3 +242,44 @@ func TestManifestRewrite(t *testing.T) {
uint64(deletionsThreshold * 3): {Level: 0, Checksum: []byte{}},
}, m.Tables)
}

func TestConcurrentManifestCompaction(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

// set this low so rewrites will happen more often
deletionsThreshold := 1

// overwrite the sync function to make this race condition easily reproducible
syncFunc = func(f *os.File) error {
// effectively making the Sync() take around 1s makes this reproduce every time
time.Sleep(1 * time.Second)
return f.Sync()
}

mf, _, err := helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
require.NoError(t, err)

cs := &pb.ManifestChangeSet{}
for i := uint64(0); i < 1000; i++ {
cs.Changes = append(cs.Changes,
newCreateChange(i, 0, nil),
newDeleteChange(i),
)
}

// simulate 2 concurrent compaction threads
n := 2
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
require.NoError(t, mf.addChanges(cs.Changes))
}()
}
wg.Wait()

require.NoError(t, mf.close())
}

0 comments on commit 10be6ca

Please sign in to comment.