Commit a55e6339 authored by craig[bot]'s avatar craig[bot]

Merge #50633

50633: kv: unset CanForwardReadTimestamp flag on batches that spans ranges r=nvanbenschoten a=nvanbenschoten

Fixes #50202.

In #50202, we saw that we would accidentally allow batches to be split on range boundaries and continue to carry the `CanForwardReadTimestamp` flag. This can lead to serializability violations under very specific conditions:
1. the first operation that the transaction performs is a batch of locking read requests (due to implicit or explicit SFU)
2. this batch of locking reads spans multiple ranges
3. this batch of locking reads is issued in parallel by the DistSender
4. this locking read hits contention and is bumped on at least one of the ranges due to a WriteTooOld error
5. an unreplicated lock from one of the non-refreshed sub-batches is lost during a lease transfer.

It turns out that the `kv/contention` roachtest meets these requirements perfectly when implicit SFU support is added to UPSERT statements: #50180. It creates a tremendous amount of contention and issues a batch of locking ScanRequests during a LookupJoin as its first operation. This materializes as ConditionFailedErrors (which should be impossible) in the CPuts that the UPSERT issues to maintain the table's secondary index.

This PR fixes this bug by ensuring that if a batch is going to be split across ranges and any of its requests would need to refresh on read timestamp bumps, it does not have its CanForwardReadTimestamp flag set. It would be incorrect to allow part of a batch to perform a server-side refresh if another part of the batch might have returned a different result at the higher timestamp, which is a fancy way of saying that it needs to refresh because it is using optimistic locking. Such behavior could cause a transaction to observe an inconsistent snapshot and violate serializability.

It then adds support for locking scans to kvnemesis, which would have caught to bug fairly easily.

Finally, it fixes a KV API UX issue around locking scans and retry errors. Before this change, it was possible for a non-transactional locking scan (which itself doesn't make much sense) to hit a WriteTooOld  retry error. This was caused by eager propagation of WriteTooOld errors from MVCC when FailOnMoreRecent was enabled for an MVCCScan. I'd appreciate if @itsbilal could give that last commit a review.

Release note (bug fix): fix a rare bug where a multi-Range SELECT FOR UPDATE statement containing an IN clause could fail to observe a consistent snapshot and violate serializability.
Co-authored-by: default avatarNathan VanBenschoten <[email protected]>
parents 61f2787d 3c656678
......@@ -67,6 +67,7 @@ template <bool reverse> class mvccScanner {
check_uncertainty_(timestamp < txn.max_timestamp),
kvs_(new chunkedBuffer),
intents_(new rocksdb::WriteBatch),
most_recent_timestamp_(),
peeked_(false),
iters_before_seek_(kMaxItersBeforeSeek / 2) {
memset(&results_, 0, sizeof(results_));
......@@ -109,6 +110,7 @@ template <bool reverse> class mvccScanner {
return results_;
}
getAndAdvance();
maybeFailOnMoreRecent();
return fillResults();
}
......@@ -133,6 +135,7 @@ template <bool reverse> class mvccScanner {
while (getAndAdvance()) {
}
maybeFailOnMoreRecent();
if (max_keys_ > 0 && kvs_->Count() == max_keys_ && advanceKey()) {
if (reverse) {
......@@ -257,11 +260,13 @@ template <bool reverse> class mvccScanner {
return true;
}
bool writeTooOldError(DBTimestamp ts) {
results_.write_too_old_timestamp = ts;
void maybeFailOnMoreRecent() {
if (results_.status.len != 0 || most_recent_timestamp_ == kZeroTimestamp) {
return;
}
results_.write_too_old_timestamp = most_recent_timestamp_;
kvs_->Clear();
intents_->Clear();
return false;
}
bool uncertaintyError(DBTimestamp ts) {
......@@ -290,7 +295,13 @@ template <bool reverse> class mvccScanner {
// 2. Our txn's read timestamp is less than the most recent
// version's timestamp and the scanner has been configured
// to throw a write too old error on more recent versions.
return writeTooOldError(cur_timestamp_);
// Merge the current timestamp with the maximum timestamp
// we've seen so we know to return an error, but then keep
// scanning so that we can return the largest possible time.
if (cur_timestamp_ > most_recent_timestamp_) {
most_recent_timestamp_ = cur_timestamp_;
}
return advanceKey();
}
if (check_uncertainty_) {
......@@ -771,6 +782,11 @@ template <bool reverse> class mvccScanner {
DBScanResults results_;
std::unique_ptr<chunkedBuffer> kvs_;
std::unique_ptr<rocksdb::WriteBatch> intents_;
// most_recent_timestamp_ stores the largest timestamp observed that is
// above the scan timestamp. Only applicable if fail_on_more_recent_ is
// true. If set and no other error is hit, a WriteToOld error will be
// returned from the scan.
DBTimestamp most_recent_timestamp_;
std::string key_buf_;
std::string saved_buf_;
bool peeked_;
......
......@@ -637,6 +637,40 @@ func splitBatchAndCheckForRefreshSpans(
return parts
}
// unsetCanForwardReadTimestampFlag ensures that if a batch is going to
// be split across ranges and any of its requests would need to refresh
// on read timestamp bumps, it does not have its CanForwardReadTimestamp
// flag set. It would be incorrect to allow part of a batch to perform a
// server-side refresh if another part of the batch that was sent to a
// different range would also need to refresh. Such behavior could cause
// a transaction to observe an inconsistent snapshot and violate
// serializability.
func unsetCanForwardReadTimestampFlag(ctx context.Context, ba *roachpb.BatchRequest) {
if !ba.CanForwardReadTimestamp {
// Already unset.
return
}
for _, req := range ba.Requests {
if roachpb.NeedsRefresh(req.GetInner()) {
// Unset the flag.
ba.CanForwardReadTimestamp = false
// We would need to also unset the CanCommitAtHigherTimestamp flag
// on any EndTxn request in the batch, but it turns out that because
// we call this function when a batch is split across ranges, we'd
// already have bailed if the EndTxn wasn't a parallel commit — and
// if it was a parallel commit then we must not have any requests
// that need to refresh (see txnCommitter.canCommitInParallel).
// Assert this for our own sanity.
if _, ok := ba.GetArg(roachpb.EndTxn); ok {
log.Fatalf(ctx, "batch unexpected contained requests "+
"that need to refresh and an EndTxn request: %s", ba)
}
return
}
}
}
// Send implements the batch.Sender interface. It subdivides the Batch
// into batches admissible for sending (preventing certain illegal
// mixtures of requests), executes each individual part (which may
......@@ -1103,6 +1137,8 @@ func (ds *DistSender) divideAndSendBatchToRanges(
return nil, errNo1PCTxn
}
}
// Make sure the CanForwardReadTimestamp flag is set to false, if necessary.
unsetCanForwardReadTimestampFlag(ctx, &ba)
// Make an empty slice of responses which will be populated with responses
// as they come in via Combine().
......
......@@ -2371,6 +2371,41 @@ func TestTxnCoordSenderRetries(t *testing.T) {
priorReads: true,
txnCoordRetry: true,
},
{
name: "write too old with multi-range locking read (err on first range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.ScanForUpdate(ctx, "a", "c", 0)
return err
},
txnCoordRetry: true,
},
{
name: "write too old with multi-range locking read (err on second range)",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "b", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.ScanForUpdate(ctx, "a", "c", 0)
return err
},
txnCoordRetry: true,
},
{
name: "write too old with multi-range batch of locking reads",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.ScanForUpdate("a", "a\x00")
b.ScanForUpdate("b", "b\x00")
return txn.Run(ctx, b)
},
txnCoordRetry: true,
},
{
// This test sends a 1PC batch with Put+EndTxn.
// The Put gets a write too old error but, since there's no refresh spans,
......
......@@ -124,6 +124,7 @@ type clientI interface {
Get(context.Context, interface{}) (kv.KeyValue, error)
Put(context.Context, interface{}, interface{}) error
Scan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
ScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
Run(context.Context, *kv.Batch) error
}
......@@ -143,7 +144,11 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation) {
err := db.Put(ctx, o.Key, o.Value)
o.Result = resultError(ctx, err)
case *ScanOperation:
kvs, err := db.Scan(ctx, o.Key, o.EndKey, 0 /* maxRows */)
fn := db.Scan
if o.ForUpdate {
fn = db.ScanForUpdate
}
kvs, err := fn(ctx, o.Key, o.EndKey, 0 /* maxRows */)
if err != nil {
o.Result = resultError(ctx, err)
} else {
......@@ -174,7 +179,11 @@ func applyBatchOp(
case *PutOperation:
b.Put(subO.Key, subO.Value)
case *ScanOperation:
b.Scan(subO.Key, subO.EndKey)
if subO.ForUpdate {
b.ScanForUpdate(subO.Key, subO.EndKey)
} else {
b.Scan(subO.Key, subO.EndKey)
}
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
......
......@@ -55,7 +55,7 @@ func TestApplier(t *testing.T) {
check(t, step(put(`a`, `1`)), `db0.Put(ctx, "a", 1) // nil`)
check(t, step(get(`a`)), `db1.Get(ctx, "a") // ("1", nil)`)
check(t, step(scan(`a`, `c`)), `db0.Scan(ctx, "a", "c", 0) // (["a":"1"], nil)`)
check(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (["a":"1"], nil)`)
check(t, step(put(`b`, `2`)), `db1.Put(ctx, "b", 2) // nil`)
check(t, step(get(`b`)), `db0.Get(ctx, "b") // ("2", nil)`)
......@@ -63,7 +63,7 @@ func TestApplier(t *testing.T) {
checkErr(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // aborted in distSender: context canceled`)
checkErr(t, step(scan(`a`, `c`)), `db0.Scan(ctx, "a", "c", 0) // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (nil, aborted in distSender: context canceled)`)
// Batch
check(t, step(batch(put(`b`, `2`), get(`a`), scan(`a`, `c`))), `
......@@ -75,12 +75,12 @@ func TestApplier(t *testing.T) {
db1.Run(ctx, b) // nil
}
`)
checkErr(t, step(batch(put(`b`, `2`), get(`a`), scan(`a`, `c`))), `
checkErr(t, step(batch(put(`b`, `2`), get(`a`), scanForUpdate(`a`, `c`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // aborted in distSender: context canceled
b.Get(ctx, "a") // (nil, aborted in distSender: context canceled)
b.Scan(ctx, "a", "c") // (nil, aborted in distSender: context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, aborted in distSender: context canceled)
db0.Run(ctx, b) // aborted in distSender: context canceled
}
`)
......
......@@ -23,9 +23,8 @@
// guarantees.
//
// TODO
// - Validate read only transactions
// - CPut/InitPut/Increment/Delete
// - DeleteRange/ClearRange/RevertRange/Scan/ReverseScan
// - DeleteRange/ClearRange/RevertRange/ReverseScan
// - TransferLease
// - ExportRequest
// - AddSSTable
......
......@@ -80,6 +80,9 @@ type ClientOperationConfig struct {
PutExisting int
// Scan is an operation that Scans a key range that may contain values.
Scan int
// ScanForUpdate is an operation that Scans a key range that may contain
// values using a per-key locking scan.
ScanForUpdate int
}
// BatchOperationConfig configures the relative probability of generating a
......@@ -132,11 +135,12 @@ type ChangeReplicasConfig struct {
// yet pass (for example, if the new operation finds a kv bug or edge case).
func newAllOperationsConfig() GeneratorConfig {
clientOpConfig := ClientOperationConfig{
GetMissing: 1,
GetExisting: 1,
PutMissing: 1,
PutExisting: 1,
Scan: 1,
GetMissing: 1,
GetExisting: 1,
PutMissing: 1,
PutExisting: 1,
Scan: 1,
ScanForUpdate: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
......@@ -359,6 +363,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randPutExisting, c.PutExisting)
}
addOpGen(allowed, randScan, c.Scan)
addOpGen(allowed, randScanForUpdate, c.ScanForUpdate)
}
func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
......@@ -397,6 +402,12 @@ func randScan(g *generator, rng *rand.Rand) Operation {
return scan(key, endKey)
}
func randScanForUpdate(g *generator, rng *rand.Rand) Operation {
op := randScan(g, rng)
op.Scan.ForUpdate = true
return op
}
func randSplitNew(g *generator, rng *rand.Rand) Operation {
key := randKey(rng)
g.currentSplits[key] = struct{}{}
......@@ -574,6 +585,10 @@ func scan(key, endKey string) Operation {
return Operation{Scan: &ScanOperation{Key: []byte(key), EndKey: []byte(endKey)}}
}
func scanForUpdate(key, endKey string) Operation {
return Operation{Scan: &ScanOperation{Key: []byte(key), EndKey: []byte(endKey), ForUpdate: true}}
}
func split(key string) Operation {
return Operation{Split: &SplitOperation{Key: []byte(key)}}
}
......
......@@ -110,7 +110,11 @@ func TestRandStep(t *testing.T) {
client.PutMissing++
}
case *ScanOperation:
client.Scan++
if o.ForUpdate {
client.ScanForUpdate++
} else {
client.Scan++
}
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
......
......@@ -179,12 +179,16 @@ func (op PutOperation) format(w *strings.Builder, fctx formatCtx) {
}
func (op ScanOperation) format(w *strings.Builder, fctx formatCtx) {
methodName := `Scan`
if op.ForUpdate {
methodName = `ScanForUpdate`
}
// NB: DB.Scan has a maxRows parameter that Batch.Scan does not have.
maxRowsArg := `, 0`
if fctx.receiver == `b` {
maxRowsArg = ``
}
fmt.Fprintf(w, `%s.Scan(ctx, %s, %s%s)`, fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey), maxRowsArg)
fmt.Fprintf(w, `%s.%s(ctx, %s, %s%s)`, fctx.receiver, methodName, roachpb.Key(op.Key), roachpb.Key(op.EndKey), maxRowsArg)
switch op.Result.Type {
case ResultType_Error:
err := errors.DecodeError(context.TODO(), *op.Result.Err)
......
This diff was suppressed by a .gitattributes entry.
......@@ -46,7 +46,8 @@ message GetOperation {
message ScanOperation {
bytes key = 1;
bytes end_key = 2;
Result result = 3 [(gogoproto.nullable) = false];
bool for_update = 3;
Result result = 4 [(gogoproto.nullable) = false];
}
message PutOperation {
......
......@@ -159,6 +159,7 @@ func (r *Replica) handleReadOnlyLocalEvalResult(
if lResult.AcquiredLocks != nil {
// These will all be unreplicated locks.
log.Eventf(ctx, "acquiring %d unreplicated locks", len(lResult.AcquiredLocks))
for i := range lResult.AcquiredLocks {
r.concMgr.OnLockAcquired(ctx, &lResult.AcquiredLocks[i])
}
......
......@@ -2530,9 +2530,11 @@ type MVCCScanResult struct {
//
// When scanning in "fail on more recent" mode, a WriteTooOldError will be
// returned if the scan observes a version with a timestamp above the read
// timestamp. Similarly, a WriteIntentError will be returned if the scan
// observes another transaction's intent, even if it has a timestamp above
// the read timestamp.
// timestamp. If the scan observes multiple versions with timestamp above
// the read timestamp, the maximum will be returned in the WriteTooOldError.
// Similarly, a WriteIntentError will be returned if the scan observes
// another transaction's intent, even if it has a timestamp above the read
// timestamp.
func MVCCScan(
ctx context.Context,
reader Reader,
......
......@@ -125,6 +125,10 @@ type pebbleMVCCScanner struct {
curValue []byte
results pebbleResults
intents pebble.Batch
// mostRecentTS stores the largest timestamp observed that is above the scan
// timestamp. Only applicable if failOnMoreRecent is true. If set and no
// other error is hit, a WriteToOld error will be returned from the scan.
mostRecentTS hlc.Timestamp
// Stores any error returned. If non-nil, iteration short circuits.
err error
// Number of iterations to try before we do a Seek/SeekReverse. Stays within
......@@ -161,6 +165,7 @@ func (p *pebbleMVCCScanner) get() {
return
}
p.getAndAdvance()
p.maybeFailOnMoreRecent()
}
// scan iterates until maxKeys records are in results, or the underlying
......@@ -179,6 +184,7 @@ func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) {
for p.getAndAdvance() {
}
p.maybeFailOnMoreRecent()
var resume *roachpb.Span
if p.maxKeys > 0 && p.results.count == p.maxKeys && p.advanceKey() {
......@@ -252,14 +258,17 @@ func (p *pebbleMVCCScanner) getFromIntentHistory() (value []byte, found bool) {
return intent.Value, true
}
// Returns a write too old error with the specified timestamp.
func (p *pebbleMVCCScanner) writeTooOldError(ts hlc.Timestamp) bool {
// Returns a write too old error if an error is not already set on the scanner
// and a more recent value was found during the scan.
func (p *pebbleMVCCScanner) maybeFailOnMoreRecent() {
if p.err != nil || p.mostRecentTS.IsEmpty() {
return
}
// The txn can't write at the existing timestamp, so we provide the error
// with the timestamp immediately after it.
p.err = roachpb.NewWriteTooOldError(p.ts, ts.Next())
p.err = roachpb.NewWriteTooOldError(p.ts, p.mostRecentTS.Next())
p.results.clear()
p.intents.Reset()
return false
}
// Returns an uncertainty error with the specified timestamp and p.txn.
......@@ -284,7 +293,11 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool {
// 2. Our txn's read timestamp is less than the most recent
// version's timestamp and the scanner has been configured
// to throw a write too old error on more recent versions.
return p.writeTooOldError(p.curKey.Timestamp)
// Merge the current timestamp with the maximum timestamp
// we've seen so we know to return an error, but then keep
// scanning so that we can return the largest possible time.
p.mostRecentTS.Forward(p.curKey.Timestamp)
return p.advanceKey()
}
if p.checkUncertainty {
......
......@@ -160,6 +160,50 @@ scan k=k2 end=k3 ts=11,0 failOnMoreRecent
scan: "k2"-"k3" -> <no data>
error: (*roachpb.WriteIntentError:) conflicting intents on "k2"
# More test cases:
#
# span = [k1, k3)
# op = scan
# for ts in (9, 10, 11):
# for failOnMoreRecent in (false, true):
# testCase()
#
run ok
scan k=k1 end=k3 ts=9,0
----
scan: "k1"-"k3" -> <no data>
run error
scan k=k1 end=k3 ts=9,0 failOnMoreRecent
----
scan: "k1"-"k3" -> <no data>
error: (*roachpb.WriteTooOldError:) WriteTooOldError: write at timestamp 0.000000009,0 too old; wrote at 0.000000010,1
run error
scan k=k1 end=k3 ts=10,0
----
scan: "k1"-"k3" -> <no data>
error: (*roachpb.WriteIntentError:) conflicting intents on "k2"
run error
scan k=k1 end=k3 ts=10,0 failOnMoreRecent
----
scan: "k1"-"k3" -> <no data>
error: (*roachpb.WriteIntentError:) conflicting intents on "k2"
run error
scan k=k1 end=k3 ts=11,0
----
scan: "k1"-"k3" -> <no data>
error: (*roachpb.WriteIntentError:) conflicting intents on "k2"
run error
scan k=k1 end=k3 ts=11,0 failOnMoreRecent
----
scan: "k1"-"k3" -> <no data>
error: (*roachpb.WriteIntentError:) conflicting intents on "k2"
# The failOnMoreRecent and inconsistent options cannot be used together.
run error
......@@ -173,3 +217,38 @@ scan k=k1 end=k2 ts=9,0 inconsistent failOnMoreRecent
----
scan: "k1"-"k2" -> <no data>
error: (*withstack.withStack:) cannot allow inconsistent reads with fail on more recent option
# If a failOnMoreRecent scan observes multiple values above its timestamp, it
# should return a WriteTooOld error with a timestamp based on the most recent
# value it saw.
run ok
put k=a v=v ts=11,0
put k=b v=v ts=13,0
put k=c v=v ts=12,0
----
>> at end:
data: "a"/0.000000011,0 -> /BYTES/v
data: "b"/0.000000013,0 -> /BYTES/v
data: "c"/0.000000012,0 -> /BYTES/v
data: "k1"/0.000000010,0 -> /BYTES/v
meta: "k2"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=0.000000010,0 min=0,0 seq=0} ts=0.000000010,0 del=false klen=12 vlen=6
data: "k2"/0.000000010,0 -> /BYTES/v
run error
scan k=a end=b_next ts=9,0 failOnMoreRecent
----
scan: "a"-"b_next" -> <no data>
error: (*roachpb.WriteTooOldError:) WriteTooOldError: write at timestamp 0.000000009,0 too old; wrote at 0.000000013,1
run error
scan k=a end=c_next ts=9,0 failOnMoreRecent
----
scan: "a"-"c_next" -> <no data>
error: (*roachpb.WriteTooOldError:) WriteTooOldError: write at timestamp 0.000000009,0 too old; wrote at 0.000000013,1
run error
scan k=b end=c_next ts=9,0 failOnMoreRecent
----
scan: "b"-"c_next" -> <no data>
error: (*roachpb.WriteTooOldError:) WriteTooOldError: write at timestamp 0.000000009,0 too old; wrote at 0.000000013,1
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment