Skip to content

Commit

Permalink
add write capacity options
Browse files Browse the repository at this point in the history
  • Loading branch information
mYmNeo committed Feb 13, 2023
1 parent a3bfc41 commit 6a32b8a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
11 changes: 4 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ type DB struct {
pub *publisher
}

const (
kvWriteChCapacity = 1000
)

func (db *DB) replayFunction() func(Entry, valuePointer) error {
type txnEntry struct {
nk []byte
Expand Down Expand Up @@ -282,7 +278,7 @@ func Open(opt Options) (db *DB, err error) {
db = &DB{
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
writeCh: make(chan *request, opt.KVWriteCapacity),
opt: opt,
manifest: manifestFile,
elog: elog,
Expand Down Expand Up @@ -344,7 +340,7 @@ func Open(opt Options) (db *DB, err error) {
db.orc.readMark.Done(db.orc.nextTxnTs)
db.orc.incrementNextTs()

db.writeCh = make(chan *request, kvWriteChCapacity)
db.writeCh = make(chan *request, opt.KVWriteCapacity)
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)

Expand Down Expand Up @@ -710,6 +706,7 @@ func (db *DB) doWrites(lc *y.Closer) {
reqLen := new(expvar.Int)
y.PendingWrites.Set(db.opt.Dir, reqLen)

writeCapacity := 3 * db.opt.KVWriteCapacity
reqs := make([]*request, 0, 10)
for {
var r *request
Expand All @@ -723,7 +720,7 @@ func (db *DB) doWrites(lc *y.Closer) {
reqs = append(reqs, r)
reqLen.Set(int64(len(reqs)))

if len(reqs) >= 3*kvWriteChCapacity {
if len(reqs) >= writeCapacity {
pendingCh <- struct{}{} // blocking.
goto writeCase
}
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Options struct {
// the same directory. Use this options with caution.
BypassLockGuard bool

// KVWriteCapacity defines the capacity channel size for write
KVWriteCapacity int

// Transaction start and commit timestamps are managed by end-user.
// This is only useful for databases built on top of Badger (like Dgraph).
// Not recommended for most users.
Expand Down Expand Up @@ -118,6 +121,7 @@ func DefaultOptions(path string) Options {
Logger: defaultLogger,
EventLogging: true,
LogRotatesToFlush: 2,
KVWriteCapacity: 1000,
}
}

Expand Down Expand Up @@ -419,3 +423,11 @@ func (opt Options) WithBypassLockGuard(b bool) Options {
opt.BypassLockGuard = b
return opt
}

// WithWriteCapacity returns a new Options value with WriteCapacity set to the given
//
// When write channel is full, Badger will block until write channel is flushed out.
func (opt Options) WithWriteCapacity(writeCapacity int) Options {
opt.KVWriteCapacity = writeCapacity
return opt
}
4 changes: 2 additions & 2 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func newOracle(opt Options) *oracle {
txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
closer: y.NewCloser(2),
}
orc.readMark.Init(orc.closer, opt.EventLogging, kvWriteChCapacity)
orc.txnMark.Init(orc.closer, opt.EventLogging, kvWriteChCapacity)
orc.readMark.Init(orc.closer, opt.EventLogging, opt.KVWriteCapacity)
orc.txnMark.Init(orc.closer, opt.EventLogging, opt.KVWriteCapacity)
return orc
}

Expand Down

0 comments on commit 6a32b8a

Please sign in to comment.