Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(PanicHandling): Add Options.PanicHandler #2033

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 61 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -185,6 +186,15 @@ func checkAndSetOptions(opt *Options) error {

// Open returns a new DB object.
func Open(opt Options) (*DB, error) {
if opt.PanicHandler != nil {
defer func() {
if r := recover(); r != nil {
r = fmt.Sprintf("%v\n%s", r, string(debug.Stack()))
opt.PanicHandler(r)
}
}()
}

if err := checkAndSetOptions(&opt); err != nil {
return nil, err
}
Expand Down Expand Up @@ -310,7 +320,9 @@ func Open(opt Options) (*DB, error) {
}

db.closers.cacheHealth = z.NewCloser(1)
go db.monitorCache(db.closers.cacheHealth)
db._go(func() {
db.monitorCache(db.closers.cacheHealth)
})

if db.opt.InMemory {
db.opt.SyncWrites = false
Expand All @@ -330,7 +342,9 @@ func Open(opt Options) (*DB, error) {
}
db.calculateSize()
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
db._go(func() {
db.updateSize(db.closers.updateSize)
})

if err := db.openMemTables(db.opt); err != nil {
return nil, y.Wrapf(err, "while opening memtables")
Expand All @@ -355,9 +369,10 @@ func Open(opt Options) (*DB, error) {
db.lc.startCompact(db.closers.compactors)

db.closers.memtable = z.NewCloser(1)
go func() {
db._go(func() {
db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
})

// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- mt
Expand All @@ -379,22 +394,28 @@ func Open(opt Options) (*DB, error) {
db.orc.readMark.Done(db.orc.nextTxnTs)
db.orc.incrementNextTs()

go db.threshold.listenForValueThresholdUpdate()
db._go(db.threshold.listenForValueThresholdUpdate)

if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}

db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
db._go(func() {
db.doWrites(db.closers.writes)
})

if !db.opt.InMemory {
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
db._go(func() {
db.vlog.waitOnGC(db.closers.valueGC)
})
}

db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)
db._go(func() {
db.pub.listenForUpdates(db.closers.pub)
})

valueDirLockGuard = nil
dirLockGuard = nil
Expand Down Expand Up @@ -975,7 +996,10 @@ func (db *DB) doWrites(lc *z.Closer) {
}

writeCase:
go writeRequests(reqs)
_reqs := reqs
db._go(func() {
writeRequests(_reqs)
})
reqs = make([]*request, 0, 10)
reqLen.Set(0)
}
Expand Down Expand Up @@ -1006,11 +1030,11 @@ func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
if err != nil {
return err
}
go func() {
db._go(func() {
err := req.Wait()
// Write is complete. Let's call the callback function now.
f(err)
}()
})
return nil
}

Expand Down Expand Up @@ -1556,9 +1580,9 @@ func (db *DB) startMemoryFlush() {
if db.closers.memtable != nil {
db.flushChan = make(chan *memTable, db.opt.NumMemtables)
db.closers.memtable = z.NewCloser(1)
go func() {
db._go(func() {
db.flushMemtable(db.closers.memtable)
}()
})
}
}

Expand All @@ -1576,9 +1600,9 @@ func (db *DB) Flatten(workers int) error {
db.opt.Infof("Attempting to compact with %+v\n", cp)
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
db._go(func() {
errCh <- db.lc.doCompact(175, cp)
}()
})
}
var success int
var rerr error
Expand Down Expand Up @@ -1649,7 +1673,9 @@ func (db *DB) blockWrite() error {

func (db *DB) unblockWrite() {
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
db._go(func() {
db.doWrites(db.closers.writes)
})

// Resume writes.
db.blockWrites.Store(0)
Expand Down Expand Up @@ -2085,3 +2111,22 @@ func (db *DB) LevelsToString() string {
b.WriteString("Level Done\n")
return b.String()
}

// go will start a goroutine but will also handle panics
// by calling a PanicHandler if provided on the Options.
func (db *DB) _go(fn func()) {
if db.opt.PanicHandler == nil {
go fn()
return
}

go func() {
defer func() {
if r := recover(); r != nil {
r = fmt.Sprintf("%v\n%s", r, string(debug.Stack()))
db.opt.PanicHandler(r)
}
}()
fn()
}()
}
26 changes: 17 additions & 9 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
defer tick.Stop()

for fileID, tf := range mf.Tables {
tf := tf
fname := table.NewFilename(fileID, db.opt.Dir)
select {
case <-tick.C:
Expand All @@ -126,7 +127,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
if fileID > maxFileID {
maxFileID = fileID
}
go func(fname string, tf TableManifest) {
db._go(func() {
var rerr error
defer func() {
throttle.Done(rerr)
Expand Down Expand Up @@ -162,7 +163,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
mu.Lock()
tables[tf.Level] = append(tables[tf.Level], t)
mu.Unlock()
}(fname, tf)
})
}
if err := throttle.Finish(); err != nil {
closeAllTables(tables)
Expand Down Expand Up @@ -351,7 +352,10 @@ func (s *levelsController) startCompact(lc *z.Closer) {
n := s.kv.opt.NumCompactors
lc.AddRunning(n - 1)
for i := 0; i < n; i++ {
go s.runCompactor(i, lc)
i := i
s.kv._go(func() {
s.runCompactor(i, lc)
})
Comment on lines +356 to +358
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.kv is an instance of DB, so I assumed it was safe to start the Goroutine using this instance. But I am not 100% sure this instance is directly derived from the DB instance the user is using, so tell me if this is a mistake.

}
}

Expand Down Expand Up @@ -846,7 +850,9 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// Can't return from here, until I decrRef all the tables that I built so far.
break
}
go func(builder *table.Builder, fileID uint64) {

fileID := s.reserveFileID()
s.kv._go(func() {
var err error
defer inflightBuilders.Done(err)
defer builder.Close()
Expand All @@ -864,7 +870,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
return
}
res <- tbl
}(builder, s.reserveFileID())
})
}
s.kv.vlog.updateDiscardStats(discardStats)
s.kv.opt.Debugf("Discard stats: %v", discardStats)
Expand Down Expand Up @@ -920,28 +926,30 @@ func (s *levelsController) compactBuildTables(
res := make(chan *table.Table, 3)
inflightBuilders := y.NewThrottle(8 + len(cd.splits))
for _, kr := range cd.splits {
kr := kr

// Initiate Do here so we can register the goroutines for buildTables too.
if err := inflightBuilders.Do(); err != nil {
s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
return nil, nil, err
}
go func(kr keyRange) {
s.kv._go(func() {
defer inflightBuilders.Done(nil)
it := table.NewMergeIterator(newIterator(), false)
defer it.Close()
s.subcompact(it, kr, cd, inflightBuilders, res)
}(kr)
})
}

var newTables []*table.Table
var wg sync.WaitGroup
wg.Add(1)
go func() {
s.kv._go(func() {
defer wg.Done()
for t := range res {
newTables = append(newTables, t)
}
}()
})

// Wait for all table builders to finish and also for newTables accumulator to finish.
err := inflightBuilders.Finish()
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type Options struct {
// with incompatible data format.
ExternalMagicVersion uint16

// PanicHandler if set will forward panic payloads to the panic handler instead of panicking.
// It is very important that the panic handler actually handles the panics instead of ignoring them.
PanicHandler func(panicPayload interface{})

// Transaction start and commit timestamps are managed by end-user.
// This is only useful for databases built on top of Badger (like Dgraph).
// Not recommended for most users.
Expand Down Expand Up @@ -794,6 +798,14 @@ func (opt Options) WithExternalMagic(magic uint16) Options {
return opt
}

// WithPanicHandler returns a new Options value with PanicHandler set to the given value.
//
// If a PanicHandler is passed it should actually handle the panic or repanic.
func (opt Options) WithPanicHandler(panicHandler func(panicPayload interface{})) Options {
opt.PanicHandler = panicHandler
return opt
}

func (opt Options) getFileFlags() int {
var flags int
// opt.SyncWrites would be using msync to sync. All writes go through mmap.
Expand Down
13 changes: 8 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,17 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
}

// Picks up ranges from Badger, and sends them to rangeCh.
go st.produceRanges(ctx)
st.db._go(func() {
st.produceRanges(ctx)
})

errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
var wg sync.WaitGroup
for i := 0; i < st.NumGo; i++ {
wg.Add(1)

go func(threadId int) {
threadId := i
st.db._go(func() {
defer wg.Done()
// Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
if err := st.produceKVs(ctx, threadId); err != nil {
Expand All @@ -414,19 +417,19 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
default:
}
}
}(i)
})
}

// Pick up key-values from kvChan and send to stream.
kvErr := make(chan error, 1)
go func() {
st.db._go(func() {
// Picks up KV lists from kvChan, and sends them to Output.
err := st.streamKVs(ctx)
if err != nil {
cancel() // Stop all the go routines.
}
kvErr <- err
}()
})
wg.Wait() // Wait for produceKVs to be over.
close(st.kvChan) // Now we can close kvChan.
defer func() {
Expand Down
9 changes: 6 additions & 3 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
}

go w.handleRequests()
w.db._go(w.handleRequests)
return w, nil
}

Expand Down Expand Up @@ -454,10 +454,13 @@ func (w *sortedWriter) send(done bool) error {
if err := w.throttle.Do(); err != nil {
return err
}
go func(builder *table.Builder) {

builder := w.builder
w.db._go(func() {
err := w.createTable(builder)
w.throttle.Done(err)
}(w.builder)
})

// If done is true, this indicates we can close the writer.
// No need to allocate underlying TableBuilder now.
if done {
Expand Down