From 09a6157303d66d5abe881df1448064c29c14b659 Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Tue, 28 Sep 2021 22:57:25 +0530 Subject: [PATCH] fix(compact): close vlog after the compaction at L0 has completed (#1752) Vlog needs to stay open while the compaction is being run. With the CompactL0OnClose option, it becomes necessary to close the vlog after all the compactions have been completed. (cherry picked from commit cba20b940c25cdd041faa58fea3577f1282affc3) --- db.go | 12 ++++++------ db_test.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/db.go b/db.go index 85fdb6ee8..3ac6a2d95 100644 --- a/db.go +++ b/db.go @@ -559,11 +559,6 @@ func (db *DB) close() (err error) { db.closers.pub.SignalAndWait() db.closers.cacheHealth.Signal() - // Now close the value log. - if vlogErr := db.vlog.Close(); vlogErr != nil { - err = y.Wrap(vlogErr, "DB.Close") - } - // Make sure that block writer is done pushing stuff into memtable! // Otherwise, you will have a race condition: we are trying to flush memtables // and remove them completely, while the block / memtable writer is still @@ -619,6 +614,11 @@ func (db *DB) close() (err error) { } } + // Now close the value log. + if vlogErr := db.vlog.Close(); vlogErr != nil { + err = y.Wrap(vlogErr, "DB.Close") + } + db.opt.Infof(db.LevelsToString()) if lcErr := db.lc.close(); err == nil { err = y.Wrap(lcErr, "DB.Close") @@ -1887,7 +1887,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches drain := func() { for { select { - case <- s.sendCh: + case <-s.sendCh: default: return } diff --git a/db_test.go b/db_test.go index 079a93974..0535adcb2 100644 --- a/db_test.go +++ b/db_test.go @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) { y.Check2(rand.Read(value)) st := 0 - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() for i := 0; i < 1000; i++ { key := make([]byte, 8) @@ -2509,3 +2509,32 @@ func TestBannedAtZeroOffset(t *testing.T) { require.NoError(t, err) }) } + +func TestCompactL0OnClose(t *testing.T) { + opt := getTestOptions("") + opt.CompactL0OnClose = true + opt.ValueThreshold = 1 // Every value goes to value log + opt.NumVersionsToKeep = 1 + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + var keys [][]byte + val := make([]byte, 1<<12) + for i := 0; i < 10; i++ { + key := make([]byte, 40) + _, err := rand.Read(key) + require.NoError(t, err) + keys = append(keys, key) + + err = db.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key, val)) + }) + require.NoError(t, err) + } + + for _, key := range keys { + err := db.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key, val)) + }) + require.NoError(t, err) + } + }) +}