diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dc49bac2..aae5f0f9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [3.2103.2] - 2021-10-07 + +### Fixed + + - fix(compact): close vlog after the compaction at L0 has completed (#1752) + - fix(builder): put the upper limit on reallocation (#1748) + - deps: Bump github.com/google/flatbuffers to v1.12.1 (#1746) + - fix(levels): Avoid a deadlock when acquiring read locks in levels (#1744) + - fix(pubsub): avoid deadlock in publisher and subscriber (#1749) (#1751) + ## [3.2103.1] - 2021-07-08 ### Fixed diff --git a/db.go b/db.go index 85fdb6ee8..3ac6a2d95 100644 --- a/db.go +++ b/db.go @@ -559,11 +559,6 @@ func (db *DB) close() (err error) { db.closers.pub.SignalAndWait() db.closers.cacheHealth.Signal() - // Now close the value log. - if vlogErr := db.vlog.Close(); vlogErr != nil { - err = y.Wrap(vlogErr, "DB.Close") - } - // Make sure that block writer is done pushing stuff into memtable! // Otherwise, you will have a race condition: we are trying to flush memtables // and remove them completely, while the block / memtable writer is still @@ -619,6 +614,11 @@ func (db *DB) close() (err error) { } } + // Now close the value log. + if vlogErr := db.vlog.Close(); vlogErr != nil { + err = y.Wrap(vlogErr, "DB.Close") + } + db.opt.Infof(db.LevelsToString()) if lcErr := db.lc.close(); err == nil { err = y.Wrap(lcErr, "DB.Close") @@ -1887,7 +1887,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches drain := func() { for { select { - case <- s.sendCh: + case <-s.sendCh: default: return } diff --git a/db_test.go b/db_test.go index 079a93974..0535adcb2 100644 --- a/db_test.go +++ b/db_test.go @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) { y.Check2(rand.Read(value)) st := 0 - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() for i := 0; i < 1000; i++ { key := make([]byte, 8) @@ -2509,3 +2509,32 @@ func TestBannedAtZeroOffset(t *testing.T) { require.NoError(t, err) }) } + +func TestCompactL0OnClose(t *testing.T) { + opt := getTestOptions("") + opt.CompactL0OnClose = true + opt.ValueThreshold = 1 // Every value goes to value log + opt.NumVersionsToKeep = 1 + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + var keys [][]byte + val := make([]byte, 1<<12) + for i := 0; i < 10; i++ { + key := make([]byte, 40) + _, err := rand.Read(key) + require.NoError(t, err) + keys = append(keys, key) + + err = db.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key, val)) + }) + require.NoError(t, err) + } + + for _, key := range keys { + err := db.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key, val)) + }) + require.NoError(t, err) + } + }) +} diff --git a/go.mod b/go.mod index 997b9d536..28da3cc36 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.3 - github.com/google/flatbuffers v1.12.0 + github.com/google/flatbuffers v1.12.1 github.com/google/go-cmp v0.5.4 // indirect github.com/klauspost/compress v1.12.3 github.com/kr/pretty v0.1.0 // indirect diff --git a/go.sum b/go.sum index 258863751..cdeb960ea 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w= -github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= diff --git a/levels.go b/levels.go index 637a3565d..1c625d1b3 100644 --- a/levels.go +++ b/levels.go @@ -1119,8 +1119,22 @@ func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool { cd.nextRange = keyRange{} cd.bot = nil - cd.lockLevels() - defer cd.unlockLevels() + // Because this level and next level are both level 0, we should NOT acquire + // the read lock twice, because it can result in a deadlock. So, we don't + // call compactDef.lockLevels, instead locking the level only once and + // directly here. + // + // As per godocs on RWMutex: + // If a goroutine holds a RWMutex for reading and another goroutine might + // call Lock, no goroutine should expect to be able to acquire a read lock + // until the initial read lock is released. In particular, this prohibits + // recursive read locking. This is to ensure that the lock eventually + // becomes available; a blocked Lock call excludes new readers from + // acquiring the lock. + y.AssertTrue(cd.thisLevel.level == 0) + y.AssertTrue(cd.nextLevel.level == 0) + s.levels[0].RLock() + defer s.levels[0].RUnlock() s.cstatus.Lock() defer s.cstatus.Unlock() diff --git a/table/builder.go b/table/builder.go index 8322bb86f..5bab48a72 100644 --- a/table/builder.go +++ b/table/builder.go @@ -100,8 +100,12 @@ type Builder struct { func (b *Builder) allocate(need int) []byte { bb := b.curBlock if len(bb.data[bb.end:]) < need { - // We need to reallocate. + // We need to reallocate. 1GB is the max size that the allocator can allocate. + // While reallocating, if doubling exceeds that limit, then put the upper bound on it. sz := 2 * len(bb.data) + if sz > (1 << 30) { + sz = 1 << 30 + } if bb.end+need > sz { sz = bb.end + need }