-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added get multiple api for badger #1990
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -745,6 +745,8 @@ func (db *DB) getMemTables() ([]*memTable, func()) { | |
// get returns the value in memtable or disk for given key. | ||
// Note that value will include meta byte. | ||
// | ||
// getBatch would return the values of list of keys in order | ||
// | ||
// IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to | ||
// maintain this invariant to search for the latest value of a key, or else we need to search in all | ||
// tables and find the max version among them. To maintain this invariant, we also need to ensure | ||
|
@@ -756,6 +758,44 @@ func (db *DB) getMemTables() ([]*memTable, func()) { | |
// do that. For every get("fooX") call where X is the version, we will search | ||
// for "fooX" in all the levels of the LSM tree. This is expensive but it | ||
// removes the overhead of handling move keys completely. | ||
func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) { | ||
if db.IsClosed() { | ||
return []y.ValueStruct{}, ErrDBClosed | ||
} | ||
tables, decr := db.getMemTables() // Lock should be released. | ||
defer decr() | ||
|
||
maxVs := make([]y.ValueStruct, len(keys)) | ||
|
||
y.NumGetsAdd(db.opt.MetricsEnabled, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are using the old metric, we should increment this by |
||
// For memtable, we need to check every memtable each time | ||
for j, key := range keys { | ||
if done[j] { | ||
continue | ||
} | ||
version := y.ParseTs(key) | ||
for i := 0; i < len(tables); i++ { | ||
vs := tables[i].sl.Get(key) | ||
y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1) | ||
if vs.Meta == 0 && vs.Value == nil { | ||
continue | ||
} | ||
// Found the required version of the key, mark as done, no need to process | ||
// it further | ||
if vs.Version == version { | ||
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1) | ||
maxVs[j] = vs | ||
done[j] = true | ||
break | ||
} | ||
if maxVs[j].Version < vs.Version { | ||
maxVs[j] = vs | ||
} | ||
} | ||
} | ||
return db.lc.getBatch(keys, maxVs, 0, done) | ||
} | ||
|
||
func (db *DB) get(key []byte) (y.ValueStruct, error) { | ||
if db.IsClosed() { | ||
return y.ValueStruct{}, ErrDBClosed | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,169 @@ | |
package badger | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"fmt" | ||
"os" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/pkg/profile" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func farEnough(itrKey, key []byte) int { | ||
n := len(itrKey) | ||
m := len(key) | ||
if m > n { | ||
m = n | ||
} | ||
|
||
for i := 0; i < m; i++ { | ||
if itrKey[i] != key[i] { | ||
return m - i | ||
} | ||
} | ||
|
||
return 0 | ||
|
||
} | ||
|
||
type ByteSliceArray [][]byte | ||
|
||
// Implementing the sort.Interface for ByteSliceArray | ||
|
||
// Len returns the length of the ByteSliceArray. | ||
func (b ByteSliceArray) Len() int { | ||
return len(b) | ||
} | ||
|
||
// Less compares two byte arrays at given indices and returns true if the byte array at index i is less than the byte array at index j. | ||
func (b ByteSliceArray) Less(i, j int) bool { | ||
return bytesLessThan(b[i], b[j]) | ||
} | ||
|
||
// Swap swaps the byte arrays at given indices. | ||
func (b ByteSliceArray) Swap(i, j int) { | ||
b[i], b[j] = b[j], b[i] | ||
} | ||
|
||
// bytesLessThan compares two byte arrays lexicographically. | ||
func bytesLessThan(a, b []byte) bool { | ||
return bytes.Compare(a, b) >= 0 | ||
} | ||
|
||
func TestReadC(t *testing.T) { | ||
allKeysF, err := os.Open("/home/harshil/all_keys_2") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bit weird to have an explicit user path, make it a temp path or something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This s just a temp test for my local validation. Hence the t.Skip() before the test starts. I can't commit this as the directory being used is quite big. |
||
require.NoError(t, err) | ||
defer allKeysF.Close() | ||
|
||
scanner := bufio.NewScanner(allKeysF) | ||
// optionally, resize scanner's capacity for lines over 64K, see next example | ||
keysList := [][]byte{} | ||
for scanner.Scan() { | ||
f := strings.Fields(scanner.Text()) | ||
b := []byte{} | ||
for _, c := range f { | ||
ic, err := strconv.Atoi(c) | ||
require.NoError(t, err) | ||
b = append(b, uint8(ic)) | ||
} | ||
keysList = append(keysList, b) | ||
} | ||
|
||
dir := "/home/harshil/data/p/" | ||
opt := DefaultOptions(dir) | ||
opt.managedTxns = true | ||
opt.Compression = 0 | ||
opt.IndexCacheSize = 0 | ||
db, err := Open(opt) | ||
require.NoError(t, err) | ||
|
||
numCh := 64 | ||
numPer := len(keysList) / numCh | ||
|
||
var wg sync.WaitGroup | ||
defer profile.Start(profile.CPUProfile).Stop() | ||
|
||
s := 0 | ||
|
||
calculateS := func(start int) { | ||
m := 0 | ||
|
||
for i := start * numPer; i < start*numPer+numPer; i += 1 { | ||
txn := db.NewTransactionAt(270005, false) | ||
|
||
key := keysList[i] | ||
item, err := txn.Get(key) | ||
require.NoError(t, err) | ||
|
||
item.Value(func(val []byte) error { | ||
m += len(val) + len(key) | ||
return nil | ||
}) | ||
txn.Discard() | ||
} | ||
wg.Done() | ||
s += m | ||
} | ||
|
||
calculate := func(start int) { | ||
m := 0 | ||
|
||
num := 500 | ||
for i := start * numPer; i < start*numPer+numPer; i += num { | ||
txn := db.NewTransactionAt(270005, false) | ||
|
||
keys := ByteSliceArray{} | ||
for j := i; j < start*numPer+numPer && j < i+num; j++ { | ||
keys = append(keys, keysList[j]) | ||
} | ||
sort.Sort(keys) | ||
items, err := txn.GetBatch(keys) | ||
require.NoError(t, err) | ||
|
||
for j, item := range items { | ||
item.Value(func(val []byte) error { | ||
m += len(val) + len(keys[j]) | ||
return nil | ||
}) | ||
} | ||
txn.Discard() | ||
} | ||
wg.Done() | ||
s += m | ||
} | ||
|
||
t1 := time.Now() | ||
for i := 0; i < numCh; i++ { | ||
wg.Add(1) | ||
go func(startPos int) { | ||
calculateS(startPos) | ||
}(i) | ||
} | ||
|
||
wg.Wait() | ||
fmt.Println(time.Since(t1), s) | ||
|
||
s = 0 | ||
t1 = time.Now() | ||
for i := 0; i < numCh; i++ { | ||
wg.Add(1) | ||
go func(startPos int) { | ||
calculate(startPos) | ||
}(i) | ||
} | ||
|
||
wg.Wait() | ||
|
||
fmt.Println(time.Since(t1), s) | ||
} | ||
|
||
func TestDiscardStats(t *testing.T) { | ||
dir, err := os.MkdirTemp("", "badger-test") | ||
require.NoError(t, err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we rename
done
tokeyRead
or something similar?