Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Txn): serializable behavior is broken #2057 #2058

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 30 additions & 5 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ type oracle struct {
type committedTxn struct {
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
// The first one is the hash of the write key. The second one is the
// the len of the reads when insert this write
conflictKeys map[uint64]int
}

func newOracle(opt Options) *oracle {
Expand Down Expand Up @@ -151,11 +153,33 @@ func (o *oracle) hasConflict(txn *Txn) bool {
continue
}

// check if there is any direct conflict
for _, ro := range txn.reads {
if _, has := committedTxn.conflictKeys[ro]; has {
return true
}
}

// check if there is any indirect conflict (logical conflict)
for _, i := range txn.conflictKeys {
if i == -1 {
continue
}

happensBeforeReads := txn.reads[:i]

for _, j := range committedTxn.conflictKeys {
otherHappensBeforeReads := txn.reads[:j]

for fp := range happensBeforeReads {
for otherFp := range otherHappensBeforeReads {
if fp == otherFp {
return true
}
}
}
}
}
}

return false
Expand Down Expand Up @@ -256,7 +280,8 @@ type Txn struct {

reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
// The value int is the len of the reads when insert this write
conflictKeys map[uint64]int
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
Expand Down Expand Up @@ -393,8 +418,8 @@ func (txn *Txn) modify(e *Entry) error {
// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = len(txn.reads) - 1 // store this write happens after how many reads
}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
Expand Down Expand Up @@ -783,7 +808,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
}
if update {
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
txn.conflictKeys = make(map[uint64]int)
}
txn.pendingWrites = make(map[string]*Entry)
}
Expand Down
58 changes: 58 additions & 0 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,64 @@
})
}

// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data
func TestTxnWriteSkew2(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Setup
db.opt.managedTxns = false
txn := db.NewTransaction(true)
defer txn.Discard()
txn.SetEntry(NewEntry([]byte("a1"), []byte("10")))

Check failure on line 338 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("a2"), []byte("20")))

Check failure on line 339 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("b1"), []byte("100")))

Check failure on line 340 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("b2"), []byte("200")))
require.NoError(t, txn.Commit())

txn1 := db.NewTransaction(true)
defer txn1.Discard()

itr := txn1.NewIterator(DefaultIteratorOptions)
sum := 0
{
for itr.Rewind(); itr.Valid(); itr.Next() {
if itr.Item().Key()[0] == 'a' {
a, _ := itr.Item().ValueCopy(nil)
val, _ := strconv.ParseUint(string(a), 10, 64)
sum += int(val)
}
}
itr.Close()
}

txn1.SetEntry(NewEntry([]byte("b3"), []byte("30")))

Check failure on line 360 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn1.SetEntry` is not checked (errcheck)

txn2 := db.NewTransaction(true)
defer txn2.Discard()
{
itr = txn2.NewIterator(DefaultIteratorOptions)
sum = 0
for itr.Rewind(); itr.Valid(); itr.Next() {
if itr.Item().Key()[0] == 'b' {
a, _ := itr.Item().ValueCopy(nil)
val, _ := strconv.ParseUint(string(a), 10, 64)
sum += int(val)
}
}
itr.Close()
}

txn2.SetEntry(NewEntry([]byte("a3"), []byte("300")))

Check failure on line 377 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn2.SetEntry` is not checked (errcheck)
require.NoError(t, txn2.Commit())

// Each transaction has modified what the other transaction would have read.
// If both were allowed to commit, this would break serializable behavior,
// because if they were run one at a time, one of the transactions would have seen the INSERT the other committed.
// We wait for a successful COMMIT of one of the transactions before we roll anything back,
// though, to ensure progress and prevent thrashing.
require.Error(t, txn1.Commit())
})
}

// a3, a2, b4 (del), b3, c2, c1
// Read at ts=4 -> a3, c2
// Read at ts=4(Uncommitted) -> a3, b4
Expand Down