Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: rollup tests similar to Dgraph #1994

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
203 changes: 203 additions & 0 deletions rollup_test/main.go
@@ -0,0 +1,203 @@
package main

import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

badger "github.com/dgraph-io/badger/v4"
)

var atomicTs int64

func adder(db *badger.DB) {

key := "test"

for i := int32(0); i < 20; i++ {
time.Sleep(10 * time.Millisecond)

var curr delta
// if i < 10 {
curr.set_op = true
curr.del_op = false
curr.consolidated = false
curr.last_version = 0
curr.values = append(curr.values, i)
// } else {
// curr.set_op = false
// curr.del_op = true
// curr.consolidated = false
// curr.last_version = 0
// curr.values = append(curr.values, i, i-10)
// }

ts := atomic.LoadInt64(&atomicTs)
fmt.Printf("Adding delta: %d. At time: %d\n", i, ts+1)

txn := db.NewTransactionAt(uint64(ts), true)

currBytes, _ := structToBytes(curr)
txn.Set([]byte(key), currBytes)

Check failure on line 43 in rollup_test/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.Set` is not checked (errcheck)

var wg sync.WaitGroup
wg.Add(1)

txn.CommitAt(uint64(ts+1), func(err error) {

Check failure on line 48 in rollup_test/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.CommitAt` is not checked (errcheck)
atomic.AddInt64(&atomicTs, 2)
wg.Done()
})

wg.Wait()

}
}

func getter(db *badger.DB) {

Check failure on line 58 in rollup_test/main.go

View workflow job for this annotation

GitHub Actions / lint

func `getter` is unused (unused)

for i := 0; i < 10; i++ {
time.Sleep(200 * time.Millisecond)
fmt.Printf("Getting value at time: %d\n", time.Now().UnixMilli())

_ = db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte("keylatestmerge_")

for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
keyname := k[len("keylatestmerge_"):]

var list delta
err := item.Value(func(val []byte) error {
var err error
list, err = bytesToStruct(val)
return err
})
if err != nil {
return err
}

fmt.Printf("Key: %s...\n", keyname)
for _, nums := range list.values {
fmt.Printf("%d, ", nums)
}
fmt.Println("")
}
return nil
})
}
}

func merger(db *badger.DB) {
// skipCounter := 0
time.Sleep(2 * time.Millisecond)
for i := 0; i < 20; i++ {
time.Sleep(20 * time.Millisecond)

ts := atomic.LoadInt64(&atomicTs)
if ts < 2 {
continue
}
fmt.Printf("Merging deltas till now at time: %d\n", ts)

txn := db.NewTransactionAt(uint64(ts), false)

opt := badger.DefaultIteratorOptions
// opt.Reverse = true

it := txn.NewKeyIterator([]byte("test"), opt)
defer it.Close()

// earliestTimestamp := 0
var consolidated delta

consolidated.consolidated = true
consolidated.set_op = false
consolidated.del_op = false
consolidated.last_version = 0

var allDeltas []delta
latestVersion := uint64(0)

for it.Seek([]byte("test")); it.Valid(); it.Next() {
item := it.Item()
version := item.Version()

if version > latestVersion {
latestVersion = version
}

// if version <= uint64(earliestTimestamp) {
// break
// }

valCopy, err := item.ValueCopy(nil)
if err != nil {
continue
}

itemValue, _ := bytesToStruct(valCopy)

// fmt.Println("Delta item value: ", itemValue, )
fmt.Printf("%d: ", version)
for _, num := range itemValue.values {
fmt.Printf("%d, ", num)
}
fmt.Println("")

if itemValue.consolidated == true {
// earliestTimestamp = int(itemValue.last_version)
consolidated = itemValue
break
} else {
allDeltas = append(allDeltas, itemValue)
}
}

for i := len(allDeltas) - 1; i >= 0; i-- {
consolidated = mergeDelta(consolidated, allDeltas[i])
}
consolidated.last_version = int32(latestVersion)
fmt.Println(consolidated)
consolidatedBytes, _ := structToBytes(consolidated)
it.Close()

txn.Discard()

var wg sync.WaitGroup
wg.Add(1)

txn = db.NewTransactionAt(uint64(ts), true)
defer txn.Discard()
txn.Set([]byte("test"), consolidatedBytes)

Check failure on line 176 in rollup_test/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.Set` is not checked (errcheck)
txn.CommitAt(uint64(latestVersion), func(err error) {

Check failure on line 177 in rollup_test/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.CommitAt` is not checked (errcheck)
if err == nil {
wg.Done()
} else {
log.Fatal(err)
}
})

wg.Wait()

fmt.Printf("Committed rollup %d at %d\n", ts, latestVersion)
}
}

func main() {
db, err := badger.OpenManaged(badger.DefaultOptions("/tmp/badger"))
if err != nil {
log.Fatal(err)
}
defer db.Close()

atomicTs = 1
go adder(db)
merger(db)

// getter(db)
}
160 changes: 160 additions & 0 deletions rollup_test/utils.go
@@ -0,0 +1,160 @@
package main

import (
"bytes"
"encoding/binary"
"fmt"

"github.com/dgraph-io/badger/v4"
)

type delta struct {
set_op, del_op, consolidated bool
last_version int32
values []int32
}

func structToBytes(d delta) ([]byte, error) {
buf := new(bytes.Buffer)

err := binary.Write(buf, binary.LittleEndian, d.set_op)
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.LittleEndian, d.del_op)
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.LittleEndian, d.consolidated)
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.LittleEndian, d.last_version)
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.LittleEndian, int32(len(d.values)))
if err != nil {
return nil, err
}

for _, v := range d.values {
err = binary.Write(buf, binary.LittleEndian, v)
if err != nil {
return nil, err
}
}

return buf.Bytes(), nil
}

func bytesToStruct(data []byte) (delta, error) {
var d delta

buf := bytes.NewReader(data)

err := binary.Read(buf, binary.LittleEndian, &d.set_op)
if err != nil {
return delta{}, err
}

err = binary.Read(buf, binary.LittleEndian, &d.del_op)
if err != nil {
return delta{}, err
}

err = binary.Read(buf, binary.LittleEndian, &d.consolidated)
if err != nil {
return delta{}, err
}

err = binary.Read(buf, binary.LittleEndian, &d.last_version)
if err != nil {
return delta{}, err
}

var valuesLen int32
err = binary.Read(buf, binary.LittleEndian, &valuesLen)
if err != nil {
return delta{}, err
}

d.values = make([]int32, valuesLen)
for i := 0; i < int(valuesLen); i++ {
err = binary.Read(buf, binary.LittleEndian, &d.values[i])
if err != nil {
return delta{}, err
}
}

return d, nil
}

func mergeDelta(existing, delta delta) delta {
if delta.del_op {
// fmt.Println("Del")
list2Map := make(map[int32]bool)
for _, num := range delta.values {
list2Map[num] = true
}

i := 0
for _, num := range existing.values {
if !list2Map[num] {
existing.values[i] = num
i++
fmt.Printf("%d ", num)
}
}

existing.values = existing.values[:i]
// fmt.Println("Del fin.")
}

if delta.set_op {
// fmt.Println("Set")
existing.values = append(existing.values, delta.values...)
}

return existing
}

func getList(key string, db *badger.DB) delta {

Check failure on line 126 in rollup_test/utils.go

View workflow job for this annotation

GitHub Actions / lint

func `getList` is unused (unused)
var valCopy []byte
db.View(func(txn *badger.Txn) error {

Check failure on line 128 in rollup_test/utils.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `db.View` is not checked (errcheck)
item, err := txn.Get([]byte(key))
if err != nil {
return nil
}
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil
}
return nil
})

ret, _ := bytesToStruct(valCopy)
return ret
}

func addTimestampToStringBytes(s string, timestamp int64) []byte {

Check failure on line 144 in rollup_test/utils.go

View workflow job for this annotation

GitHub Actions / lint

func `addTimestampToStringBytes` is unused (unused)
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.BigEndian, timestamp)
return append([]byte(s), buf.Bytes()...)
}

func byteToKeyAndTimestamp(b []byte) (string, int64) {

Check failure on line 150 in rollup_test/utils.go

View workflow job for this annotation

GitHub Actions / lint

func `byteToKeyAndTimestamp` is unused (unused)
length := len(b)
ret_key := b[:length-8]

buf := bytes.NewReader(b[length-8:])
var ret_ts int64
binary.Read(buf, binary.BigEndian, &ret_ts)

Check failure on line 156 in rollup_test/utils.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `binary.Read` is not checked (errcheck)

ret := fmt.Sprintf("%s%d", ret_key, ret_ts)
return ret, ret_ts
}