Skip to content

Commit

Permalink
fix(compact): close vlog after the compaction at L0 has completed (#1752
Browse files Browse the repository at this point in the history
)

Vlog needs to stay open while the compaction is being run. With the CompactL0OnClose option, it becomes necessary to close the vlog after all the compactions have been completed.

(cherry picked from commit cba20b9)
  • Loading branch information
NamanJain8 committed Oct 5, 2021
1 parent 4298c58 commit 09a6157
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
12 changes: 6 additions & 6 deletions db.go
Expand Up @@ -559,11 +559,6 @@ func (db *DB) close() (err error) {
db.closers.pub.SignalAndWait()
db.closers.cacheHealth.Signal()

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

// Make sure that block writer is done pushing stuff into memtable!
// Otherwise, you will have a race condition: we are trying to flush memtables
// and remove them completely, while the block / memtable writer is still
Expand Down Expand Up @@ -619,6 +614,11 @@ func (db *DB) close() (err error) {
}
}

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

db.opt.Infof(db.LevelsToString())
if lcErr := db.lc.close(); err == nil {
err = y.Wrap(lcErr, "DB.Close")
Expand Down Expand Up @@ -1887,7 +1887,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
drain := func() {
for {
select {
case <- s.sendCh:
case <-s.sendCh:
default:
return
}
Expand Down
31 changes: 30 additions & 1 deletion db_test.go
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down Expand Up @@ -2509,3 +2509,32 @@ func TestBannedAtZeroOffset(t *testing.T) {
require.NoError(t, err)
})
}

func TestCompactL0OnClose(t *testing.T) {
opt := getTestOptions("")
opt.CompactL0OnClose = true
opt.ValueThreshold = 1 // Every value goes to value log
opt.NumVersionsToKeep = 1
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
var keys [][]byte
val := make([]byte, 1<<12)
for i := 0; i < 10; i++ {
key := make([]byte, 40)
_, err := rand.Read(key)
require.NoError(t, err)
keys = append(keys, key)

err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}

for _, key := range keys {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}
})
}

0 comments on commit 09a6157

Please sign in to comment.