Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release/v3.2103: bring fixes from master #1753

Merged
merged 5 commits into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,16 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## [3.2103.2] - 2021-10-07

### Fixed

- fix(compact): close vlog after the compaction at L0 has completed (#1752)
- fix(builder): put the upper limit on reallocation (#1748)
- deps: Bump github.com/google/flatbuffers to v1.12.1 (#1746)
- fix(levels): Avoid a deadlock when acquiring read locks in levels (#1744)
- fix(pubsub): avoid deadlock in publisher and subscriber (#1749) (#1751)

## [3.2103.1] - 2021-07-08

### Fixed
Expand Down
12 changes: 6 additions & 6 deletions db.go
Expand Up @@ -559,11 +559,6 @@ func (db *DB) close() (err error) {
db.closers.pub.SignalAndWait()
db.closers.cacheHealth.Signal()

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

// Make sure that block writer is done pushing stuff into memtable!
// Otherwise, you will have a race condition: we are trying to flush memtables
// and remove them completely, while the block / memtable writer is still
Expand Down Expand Up @@ -619,6 +614,11 @@ func (db *DB) close() (err error) {
}
}

// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}

db.opt.Infof(db.LevelsToString())
if lcErr := db.lc.close(); err == nil {
err = y.Wrap(lcErr, "DB.Close")
Expand Down Expand Up @@ -1887,7 +1887,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
drain := func() {
for {
select {
case <- s.sendCh:
case <-s.sendCh:
default:
return
}
Expand Down
31 changes: 30 additions & 1 deletion db_test.go
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down Expand Up @@ -2509,3 +2509,32 @@ func TestBannedAtZeroOffset(t *testing.T) {
require.NoError(t, err)
})
}

func TestCompactL0OnClose(t *testing.T) {
opt := getTestOptions("")
opt.CompactL0OnClose = true
opt.ValueThreshold = 1 // Every value goes to value log
opt.NumVersionsToKeep = 1
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
var keys [][]byte
val := make([]byte, 1<<12)
for i := 0; i < 10; i++ {
key := make([]byte, 40)
_, err := rand.Read(key)
require.NoError(t, err)
keys = append(keys, key)

err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}

for _, key := range keys {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(key, val))
})
require.NoError(t, err)
}
})
}
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.3
github.com/google/flatbuffers v1.12.0
github.com/google/flatbuffers v1.12.1
github.com/google/go-cmp v0.5.4 // indirect
github.com/klauspost/compress v1.12.3
github.com/kr/pretty v0.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -34,8 +34,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand Down
18 changes: 16 additions & 2 deletions levels.go
Expand Up @@ -1119,8 +1119,22 @@ func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
cd.nextRange = keyRange{}
cd.bot = nil

cd.lockLevels()
defer cd.unlockLevels()
// Because this level and next level are both level 0, we should NOT acquire
// the read lock twice, because it can result in a deadlock. So, we don't
// call compactDef.lockLevels, instead locking the level only once and
// directly here.
//
// As per godocs on RWMutex:
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually
// becomes available; a blocked Lock call excludes new readers from
// acquiring the lock.
y.AssertTrue(cd.thisLevel.level == 0)
y.AssertTrue(cd.nextLevel.level == 0)
s.levels[0].RLock()
defer s.levels[0].RUnlock()

s.cstatus.Lock()
defer s.cstatus.Unlock()
Expand Down
6 changes: 5 additions & 1 deletion table/builder.go
Expand Up @@ -100,8 +100,12 @@ type Builder struct {
func (b *Builder) allocate(need int) []byte {
bb := b.curBlock
if len(bb.data[bb.end:]) < need {
// We need to reallocate.
// We need to reallocate. 1GB is the max size that the allocator can allocate.
// While reallocating, if doubling exceeds that limit, then put the upper bound on it.
sz := 2 * len(bb.data)
if sz > (1 << 30) {
sz = 1 << 30
}
if bb.end+need > sz {
sz = bb.end + need
}
Expand Down