Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compact): close vlog after the compaction at L0 has completed #1752

Merged
merged 1 commit into from Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions db.go
Expand Up @@ -559,11 +559,6 @@ func (db *DB) close() (err error) {
db.closers.pub.SignalAndWait()
db.closers.cacheHealth.Signal()

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

// Make sure that block writer is done pushing stuff into memtable!
// Otherwise, you will have a race condition: we are trying to flush memtables
// and remove them completely, while the block / memtable writer is still
Expand Down Expand Up @@ -619,6 +614,11 @@ func (db *DB) close() (err error) {
}
}

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

db.opt.Infof(db.LevelsToString())
if lcErr := db.lc.close(); err == nil {
err = y.Wrap(lcErr, "DB.Close")
Expand Down Expand Up @@ -2113,7 +2113,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
drain := func() {
for {
select {
case <- s.sendCh:
case <-s.sendCh:
default:
return
}
Expand Down
29 changes: 29 additions & 0 deletions db_test.go
Expand Up @@ -2547,3 +2547,32 @@ func TestSeekTs(t *testing.T) {
}
})
}

func TestCompactL0OnClose(t *testing.T) {
opt := getTestOptions("")
opt.CompactL0OnClose = true
opt.ValueThreshold = 1 // Every value goes to value log
opt.NumVersionsToKeep = 1
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
var keys [][]byte
val := make([]byte, 1<<12)
for i := 0; i < 10; i++ {
key := make([]byte, 40)
_, err := rand.Read(key)
require.NoError(t, err)
keys = append(keys, key)

err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}

for _, key := range keys {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}
})
}