Skip to content

Commit

Permalink
Merge #88550 #88641
Browse files Browse the repository at this point in the history
88550: kvserver: use execution timestamps for verified when available r=erikgrinaker a=tbg

Now that "most" operations save their execution timestamps, use them
for verification.

This has the undesirable side effect of failing the entire test suite,
which didn't bother specifying timestamps for most operations.

Now they are required, and need to be present, at least for all
mutations.

I took the opportunity to also clean up the test helpers a bit,
so now we don't have to pass an `error` when it's not required.

The big remaining caveat is that units that return with an ambiguous
result don't necessarily have a commit timestamp. I *think* this is only
an implementation detail. We *could* ensure that `AmbiguousResultError`
always contains the one possible commit timestamp. This should work
since `TxnCoordSender` is always local to `kvnemesis`, and so there's
no "fallible" component between the two.

This would result in a significant simplification of `kvnemesis`, since
as is when there are ambiguous deletions, we have to materialize them
but cannot assign them a timestamp. This complicates various code paths
and to be honest I'm not even sure what exactly we verify and how it all
works when there are such "half-materialized" writes. I would rather do
away with the concept altogether. Clearly we also won't be able to
simplify the verification to simply use commit order if there are
operations that don't have a timestamp, which is another reason to keep
pushing on this.

Release note: None


88641: workload: Bump prepare timeout to 90 minute r=aayushshah15 a=andrewbaptist

Relates to #72083. Allow scatter to complete.

Release note: None

Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 26, 2022
3 parents db9f943 + 09043a3 + 643d7d2 commit beb40b5
Show file tree
Hide file tree
Showing 3 changed files with 530 additions and 513 deletions.
115 changes: 66 additions & 49 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ func (*observedScan) observedMarker() {}
type validator struct {
kvs *Engine

// Ops for the current atomic unit. This is reset between units, in
// checkAtomic, which then calls processOp (which might recurse owing
// to the existence of txn closures, batches, etc).
curOps []observedOp
// Observations for the current atomic unit. This is reset between units, in
// checkAtomic, which then calls processOp (which might recurse owing to the
// existence of txn closures, batches, etc).
curObservations []observedOp

// NB: The Generator carefully ensures that each value written is unique
// globally over a run, so there's a 1:1 relationship between a value that was
Expand Down Expand Up @@ -356,17 +356,17 @@ func (v *validator) processOp(buffering bool, op Operation) {
case *GetOperation:
v.failIfError(op, t.Result)
if !buffering {
v.checkAtomic(`get`, t.Result, hlc.Timestamp{}, op)
v.checkAtomic(`get`, t.Result, op)
} else {
read := &observedRead{
Key: t.Key,
Value: roachpb.Value{RawBytes: t.Result.Value},
}
v.curOps = append(v.curOps, read)
v.curObservations = append(v.curObservations, read)
}
case *PutOperation:
if !buffering {
v.checkAtomic(`put`, t.Result, hlc.Timestamp{}, op)
v.checkAtomic(`put`, t.Result, op)
} else {
// Accumulate all the writes for this transaction.
kv, ok := v.kvByValue[string(t.Value)]
Expand All @@ -379,11 +379,11 @@ func (v *validator) processOp(buffering bool, op Operation) {
if write.Materialized {
write.Timestamp = kv.Key.Timestamp
}
v.curOps = append(v.curOps, write)
v.curObservations = append(v.curObservations, write)
}
case *DeleteOperation:
if !buffering {
v.checkAtomic(`delete`, t.Result, hlc.Timestamp{}, op)
v.checkAtomic(`delete`, t.Result, op)
} else {
// NB: While Put operations can be identified as having materialized
// (or not) in the storage engine because the Generator guarantees each
Expand All @@ -395,17 +395,17 @@ func (v *validator) processOp(buffering bool, op Operation) {
// operation for a key in a given transaction was a Delete, and
// validating that a potential tombstone for that key was stored.
// This validation must be done at the end of the transaction;
// specifically, in the function `checkCommittedTxn(..)` where it looks
// specifically, in the function `checkAtomicCommitted(..)` where it looks
// up a corresponding tombstone with `getDeleteForKey(..)`.
write := &observedWrite{
Key: t.Key,
Value: roachpb.Value{},
}
v.curOps = append(v.curOps, write)
v.curObservations = append(v.curObservations, write)
}
case *DeleteRangeOperation:
if !buffering {
v.checkAtomic(`deleteRange`, t.Result, hlc.Timestamp{}, op)
v.checkAtomic(`deleteRange`, t.Result, op)
} else {
// For the purposes of validation, DelRange operations decompose into
// a specialized scan for keys with non-nil values, followed by
Expand Down Expand Up @@ -433,8 +433,8 @@ func (v *validator) processOp(buffering bool, op Operation) {
}
deleteOps[i] = write
}
v.curOps = append(v.curOps, scan)
v.curOps = append(v.curOps, deleteOps...)
v.curObservations = append(v.curObservations, scan)
v.curObservations = append(v.curObservations, deleteOps...)
}
case *ScanOperation:
v.failIfError(op, t.Result)
Expand All @@ -443,7 +443,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
if t.Reverse {
atomicScanType = `reverse scan`
}
v.checkAtomic(atomicScanType, t.Result, hlc.Timestamp{}, op)
v.checkAtomic(atomicScanType, t.Result, op)
} else {
scan := &observedScan{
Span: roachpb.Span{
Expand All @@ -459,7 +459,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
Value: roachpb.Value{RawBytes: kv.Value},
}
}
v.curOps = append(v.curOps, scan)
v.curObservations = append(v.curObservations, scan)
}
case *SplitOperation:
execTimestampStrictlyOptional = true
Expand Down Expand Up @@ -554,7 +554,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
if !resultIsRetryable(t.Result) {
v.failIfError(op, t.Result)
if !buffering {
v.checkAtomic(`batch`, t.Result, hlc.Timestamp{}, t.Ops...)
v.checkAtomic(`batch`, t.Result, t.Ops...)
} else {
for _, op := range t.Ops {
v.processOp(buffering, op)
Expand All @@ -566,15 +566,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
if t.CommitInBatch != nil {
ops = append(ops, t.CommitInBatch.Ops...)
}
var optOpsTimestamp hlc.Timestamp
if t.Result.Err == nil {
if t.Txn == nil {
v.failures = append(v.failures, errors.AssertionFailedf("missing transaction"))
break
}
optOpsTimestamp = t.Txn.WriteTimestamp
}
v.checkAtomic(`txn`, t.Result, optOpsTimestamp, ops...)
v.checkAtomic(`txn`, t.Result, ops...)
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t))
}
Expand All @@ -587,28 +579,40 @@ func (v *validator) processOp(buffering bool, op Operation) {
// checkAtomic verifies a set of operations that should be atomic by trying to find
// a timestamp at which the observed reads and writes of the operations (as executed
// in the order in which they appear in the arguments) match the MVCC history.
func (v *validator) checkAtomic(
atomicType string, result Result, optOpsTimestamp hlc.Timestamp, ops ...Operation,
) {
func (v *validator) checkAtomic(atomicType string, result Result, ops ...Operation) {
for _, op := range ops {
// NB: we're not really necessarily in a txn, but passing true here means that
// we have an atomic unit, which is also the case if we are called here by a
// non-transactional Put, for example.
v.processOp(isBuffering, op)
}
txnObservations := v.curOps
v.curOps = nil
observations := v.curObservations
v.curObservations = nil

if result.Type != ResultType_Error {
v.checkCommittedTxn(`committed `+atomicType, txnObservations, optOpsTimestamp)
// The timestamp is not optional in this case. Note however that at the time
// of writing, checkAtomicCommitted doesn't capitalize on this unconditional
// presence yet, and most unit tests don't specify it for reads.
if result.OptionalTimestamp.IsEmpty() {
err := errors.AssertionFailedf("operation has no execution timestamp: %s", result)
v.failures = append(v.failures, err)
}
v.checkAtomicCommitted(`committed `+atomicType, observations, result.OptionalTimestamp)
} else if resultIsAmbiguous(result) {
v.checkAmbiguousTxn(`ambiguous `+atomicType, txnObservations)
v.checkAtomicAmbiguous(`ambiguous `+atomicType, observations)
} else {
v.checkUncommittedTxn(`uncommitted `+atomicType, txnObservations)
v.checkAtomicUncommitted(`uncommitted `+atomicType, observations)
}
}

func (v *validator) checkCommittedTxn(
// checkAtomicCommitted verifies an atomic unit (i.e. single cmd, batch, or txn) that
// was successful. Its writes thus must be present, and (as is always the case, no
// matter the outcome) its reads must have been valid.
//
// The execution timestamp optOptsTimestamp is always present for operations that
// succeeded in a "normal" way. However, for ambiguous results, it is not always
// present. This limitation could be lifted, see checkAtomicAmbiguous.
func (v *validator) checkAtomicCommitted(
atomicType string, txnObservations []observedOp, optOpsTimestamp hlc.Timestamp,
) {
// The following works by verifying that there is at least one time at which
Expand Down Expand Up @@ -699,10 +703,17 @@ func (v *validator) checkCommittedTxn(
key := string(o.Key)
v.committedDeletesForKey[key]++
if optOpsTimestamp.IsEmpty() {
// In the case that the delete is not in a transaction (or in an
// ambiguous transaction), we do not match it to a specific
// tombstone as we cannot be certain which tombstone resulted from
// this operation; hence, we leave the timestamp empty.
// Special case: our operation doesn't know at which timestamp
// it wrote and so we're unable to match it to a particular tombstone
// and can only check the cardinality - if there was a tombstone left,
// we assume it's ours.
//
// We leave the Timestamp field empty as a result (if there are
// multiple tombstones left, how do we know which one is ours?) and
// everyone else needs to be able to handle this special case.
//
// TODO(tbg): see checkAtomicAmbiguous about letting ambiguously
// committed operations learn their commit timestamp.
o.Materialized = v.committedDeletesForKey[key] <= len(v.tombstonesForKey[key])
} else if storedDelete, ok := v.getDeleteForKey(key, optOpsTimestamp); ok {
o.Materialized = true
Expand Down Expand Up @@ -836,16 +847,18 @@ func (v *validator) checkCommittedTxn(
}
}

func (v *validator) checkAmbiguousTxn(atomicType string, txnObservations []observedOp) {
func (v *validator) checkAtomicAmbiguous(atomicType string, txnObservations []observedOp) {
var somethingCommitted bool
deletedKeysInTxn := make(map[string]int)
var hadWrite bool
var maybeExecTS hlc.Timestamp
for _, observation := range txnObservations {
switch o := observation.(type) {
case *observedWrite:
hadWrite = true
if o.Materialized {
somethingCommitted = true
maybeExecTS.Forward(o.Timestamp) // use Forward() just in case o.Timestamp is zero
break
}
if o.isDelete() && len(v.tombstonesForKey[string(o.Key)]) > v.committedDeletesForKey[string(o.Key)] {
Expand All @@ -861,6 +874,13 @@ func (v *validator) checkAmbiguousTxn(atomicType string, txnObservations []obser
// resulting from a delete operation, it is impossible to validate if the
// transaction was actually atomic. For now, we have chosen to fail loudly,
// though if we are able to validate properly, this should be removed.
//
// TODO(tbg): this might be addressable. For an ambiguous transaction we
// should still be able to salvage the timestamp at which the transaction
// would have committed if it did, because kvnemesis always has a local
// TxnCoordSender which always knows the one possible commit timestamp
// and so it's simply a matter of making sure this information is
// guaranteed to flow back with the AmbiguousResultError.
err := errors.Errorf(
`unable to validate delete operations in ambiguous transactions: %s`,
printObserved(txnObservations...),
Expand All @@ -875,19 +895,16 @@ func (v *validator) checkAmbiguousTxn(atomicType string, txnObservations []obser
// later deletes may show "committed delete missing write" errors.
v.committedDeletesForKey[key]++
}
} else if !hadWrite {
// TODO(dan): Is it possible to receive an ambiguous read-only txn? Assume
// committed for now because the committed case has assertions about reads
// but the uncommitted case doesn't and this seems to work.
v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{})
} else if somethingCommitted {
v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{})
} else if !hadWrite || somethingCommitted {
v.checkAtomicCommitted(atomicType, txnObservations, maybeExecTS)
} else {
v.checkUncommittedTxn(atomicType, txnObservations)
// This is a writing transaction but not a single one of its writes
// showed up in KV, so verify that it is uncommitted.
v.checkAtomicUncommitted(atomicType, txnObservations)
}
}

func (v *validator) checkUncommittedTxn(atomicType string, txnObservations []observedOp) {
func (v *validator) checkAtomicUncommitted(atomicType string, txnObservations []observedOp) {
var failure string
for _, observed := range txnObservations {
if failure != `` {
Expand Down

0 comments on commit beb40b5

Please sign in to comment.