Commit a9ff82f3 authored by craig[bot]'s avatar craig[bot]

Merge #49997 #50802

49997: kvclient: merge RangeDescriptorCache and LeaseholderCache r=andreimatei a=andreimatei

Each patch works towards integrating leaseholder infomation in the RangeDescriptorCache, culminating with the last commit which deletes the LeaseholderCache.

The motivation for this is that having these two separate datastructures doesn't make sense: all the users want both descriptor and leaseholder information, and having incoherence between the two caches (i.e. a leaseholder that's not part of the cached descriptor, or a leaseholder with a missing descriptor) is very awkward.

I've got patches coming that update these caches more, and having a single cache to deal with makes things much easier.

50802: Makefile: fix vendor modvendor r=petermattis a=otan

Since modvendor requires installation during vendor rebuilding, but a
dependency may have changed, `go mod` complains about inconsistent
versioning. To fix this, use -mod=mod when installing modvendor.

Release note: None
Co-authored-by: default avatarAndrei Matei <[email protected]>
Co-authored-by: default avatarOliver Tan <[email protected]>
......@@ -314,7 +314,9 @@ $(call make-lazy,term-reset)
# Force vendor directory to rebuild.
.PHONY: vendor_rebuild
vendor_rebuild: bin/.submodules-initialized
$(GO_INSTALL) -v github.com/goware/modvendor
# Use -mod=mod, as -mod=vendor will try install from the vendor directory
# which may be mismatching upon rebuild.
$(GO_INSTALL) -v -mod=mod github.com/goware/modvendor
./build/vendor_rebuild.sh
# Tell Make to delete the target if its recipe fails. Otherwise, if a recipe
......
This diff was suppressed by a .gitattributes entry.
This diff was suppressed by a .gitattributes entry.
This diff was suppressed by a .gitattributes entry.
This diff was suppressed by a .gitattributes entry.
This diff was suppressed by a .gitattributes entry.
This diff was suppressed by a .gitattributes entry.
......@@ -108,7 +108,6 @@ func distBackup(
sql.NewMetadataCallbackWriter(rowResultWriter, metaFn),
tree.Rows,
nil, /* rangeCache */
nil, /* leaseCache */
noTxn, /* txn - the flow does not read or write the database */
func(ts hlc.Timestamp) {},
evalCtx.Tracing,
......
......@@ -175,7 +175,6 @@ func distChangefeedFlow(
resultRows,
tree.Rows,
execCfg.RangeDescriptorCache,
execCfg.LeaseHolderCache,
noTxn,
func(ts hlc.Timestamp) {},
evalCtx.Tracing,
......
......@@ -540,13 +540,6 @@ func (g *Gossip) GetNodeIDSQLAddress(nodeID roachpb.NodeID) (*util.UnresolvedAdd
return g.getNodeIDSQLAddressLocked(nodeID)
}
// GetNodeIDForStoreID looks up the NodeID by StoreID.
func (g *Gossip) GetNodeIDForStoreID(storeID roachpb.StoreID) (roachpb.NodeID, error) {
g.mu.RLock()
defer g.mu.RUnlock()
return g.getNodeIDForStoreIDLocked(storeID)
}
// GetNodeDescriptor looks up the descriptor of the node by ID.
func (g *Gossip) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
g.mu.RLock()
......@@ -884,13 +877,6 @@ func (g *Gossip) updateStoreMap(key string, content roachpb.Value) {
g.storeMap[desc.StoreID] = desc.Node.NodeID
}
func (g *Gossip) getNodeIDForStoreIDLocked(storeID roachpb.StoreID) (roachpb.NodeID, error) {
if nodeID, ok := g.storeMap[storeID]; ok {
return nodeID, nil
}
return 0, errors.Errorf("unable to look up Node ID for store %d", storeID)
}
func (g *Gossip) updateClients() {
nodeID := g.NodeID.Get()
if nodeID == 0 {
......@@ -1718,7 +1704,6 @@ type DeprecatedOracleGossip interface {
// GetNodeDescriptor is used by oracles to order replicas by distance from the
// current locality.
GetNodeDescriptor(roachpb.NodeID) (*roachpb.NodeDescriptor, error)
GetNodeIDForStoreID(roachpb.StoreID) (roachpb.NodeID, error)
}
// DeprecatedOracleGossip returns an DeprecatedOracleGossip (a Gossip for use with the
......
......@@ -213,9 +213,9 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if k, err := keys.Addr(nextKey); err != nil {
log.Warningf(ctx, "failed to get RKey for flush key lookup")
} else {
r := b.rc.GetCachedRangeDescriptor(k, false /* inverted */)
r := b.rc.GetCached(k, false /* inverted */)
if r != nil {
b.flushKey = r.EndKey.AsRawKey()
b.flushKey = r.Desc.EndKey.AsRawKey()
log.VEventf(ctx, 3, "building sstable that will flush before %v", b.flushKey)
} else {
log.VEventf(ctx, 3, "no cached range desc available to determine sst flush key")
......@@ -428,7 +428,9 @@ func AddSSTable(
}
// This range has split -- we need to split the SST to try again.
if m := (*roachpb.RangeKeyMismatchError)(nil); errors.As(err, &m) {
split := m.MismatchedRange.EndKey.AsRawKey()
// TODO(andrei): We just use the first of m.Ranges; presumably we
// should be using all of them to avoid further retries.
split := m.Ranges()[0].Desc.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings)
if err != nil {
......
......@@ -167,12 +167,18 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
if err != nil {
t.Fatal(err)
}
r, _, err := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().LookupRangeDescriptorWithEvictionToken(
ctx, addr, nil, false)
tok, err := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().LookupWithEvictionToken(
ctx, addr, kvcoord.EvictionToken{}, false)
if err != nil {
t.Fatal(err)
}
mockCache.InsertRangeDescriptors(ctx, *r)
r := roachpb.RangeInfo{
Desc: *tok.Desc(),
}
if l := tok.Lease(); l != nil {
r.Lease = *l
}
mockCache.Insert(ctx, r)
ts := hlc.Timestamp{WallTime: 100}
b, err := bulk.MakeBulkAdder(
......@@ -278,6 +284,7 @@ func (m mockSender) SplitAndScatter(ctx context.Context, _ roachpb.Key, _ hlc.Ti
// spanning SST is being ingested over a span with a lot of splits.
func TestAddBigSpanningSSTWithSplits(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
if testing.Short() {
t.Skip("this test needs to do a larger SST to see the quadratic mem usage on retries kick in.")
......@@ -318,9 +325,9 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {
} else if i == len(splits)-earlySplit {
late = getMem()
}
return &roachpb.RangeKeyMismatchError{
MismatchedRange: roachpb.RangeDescriptor{EndKey: roachpb.RKey(splits[i])},
}
return roachpb.NewRangeKeyMismatchError(
ctx, span.Key, span.EndKey,
&roachpb.RangeDescriptor{EndKey: roachpb.RKey(splits[i])}, nil /* lease */)
}
}
return nil
......@@ -330,7 +337,7 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {
t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end)
if _, err := bulk.AddSSTable(
context.Background(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(),
ctx, mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(),
); err != nil {
t.Fatal(err)
}
......
......@@ -13,16 +13,116 @@ package kvbase
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
)
// RangeDescriptorCache is a simplified interface to the kv.RangeDescriptorCache
// for use at lower levels of the stack like storage.
type RangeDescriptorCache interface {
// Lookup looks up range information for the range containing key.
Lookup(ctx context.Context, key roachpb.RKey) (*RangeCacheEntry, error)
}
// RangeCacheEntry represents one cache entry.
//
// The cache stores *RangeCacheEntry. Entries are immutable: cache lookups
// returns the same *RangeCacheEntry to multiple queriers for efficiency, but
// nobody should modify the lookup result.
type RangeCacheEntry struct {
// Desc is always populated.
Desc roachpb.RangeDescriptor
// Lease has info on the range's lease. It can be Empty() if no lease
// information is known. When a lease is known, it is guaranteed that the
// lease comes from Desc's range id (i.e. we'll never put a lease from another
// range in here). This allows UpdateLease() to use Lease.Sequence to compare
// leases. Moreover, the lease will correspond to one of the replicas in Desc.
Lease roachpb.Lease
}
// LookupRangeDescritor looks up a range descriptor based on a key.
LookupRangeDescriptor(
ctx context.Context, key roachpb.RKey,
) (*roachpb.RangeDescriptor, error)
func (e RangeCacheEntry) String() string {
return fmt.Sprintf("desc:%s, lease:%s", e.Desc, e.Lease)
}
// UpdateLease returns a new RangeCacheEntry with the receiver's descriptor and
// a new lease. The updated retval indicates whether the passed-in lease appears
// to be newer than the lease the entry had before. If updated is returned true,
// the caller should evict the existing entry (the receiver) and replace it with
// newEntry. (true, nil) can be returned meaning that the existing entry should
// be evicted, but there's no replacement that this function can provide; this
// happens when the passed-in lease indicates a leaseholder that's not part of
// the entry's descriptor. The descriptor must be really stale, and the caller
// should read a new version.
//
// If updated=false is returned, then newEntry will be the same as the receiver.
// This means that the passed-in lease is older than the lease already in the
// entry.
//
// If the new leaseholder is not a replica in the descriptor, we assume the
// lease information to be more recent than the entry's descriptor, and we
// return true, nil. The caller should evict the receiver from the cache, but
// it'll have to do extra work to figure out what to insert instead.
func (e *RangeCacheEntry) UpdateLease(l *roachpb.Lease) (updated bool, newEntry *RangeCacheEntry) {
// If l is older than what the entry has (or the same), return early.
// A new lease with a sequence of 0 is presumed to be newer than anything, and
// an existing lease with a sequence of 0 is presumed to be older than
// anything.
//
// We handle the case of a lease with the sequence equal to the existing
// entry, but otherwise different. This results in the new lease updating the
// entry, because the existing lease might correspond to a proposed lease that
// a replica returned speculatively while a lease acquisition was in progress.
if l.Sequence != 0 && e.Lease.Sequence != 0 && l.Sequence < e.Lease.Sequence {
return false, e
}
if l.Equal(e.Lease) {
return false, e
}
// Check whether the lease we were given is compatible with the replicas in
// the descriptor. If it's not, the descriptor must be really stale, and the
// RangeCacheEntry needs to be evicted.
_, ok := e.Desc.GetReplicaDescriptorByID(l.Replica.ReplicaID)
if !ok {
return true, nil
}
// TODO(andrei): If the leaseholder is present, but the descriptor lists the
// replica as a learner, this is a sign of a stale descriptor. I'm not sure
// what to do about it, though.
return true, &RangeCacheEntry{
Desc: e.Desc,
Lease: *l,
}
}
// NewerThan returns true if the receiver represents newer information about the
// range than o. The descriptors are assumed to be overlapping.
//
// When comparing two overlapping entries for deciding which one is stale, the
// descriptor's generation is checked first. For equal descriptor generations,
// the lease sequence number is checked second. For equal lease sequences,
// returns false. Note that this means that an Empty() e.Lease is considered
// older than any lease in o.
func (e *RangeCacheEntry) NewerThan(o *RangeCacheEntry) bool {
if util.RaceEnabled {
if _, err := e.Desc.RSpan().Intersect(&o.Desc); err != nil {
panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.Desc, o.Desc))
}
}
if e.Desc.Generation == o.Desc.Generation {
// If two RangeDescriptors overlap and have the same Generation, they must
// be referencing the same range, in which case their lease sequences are
// comparable.
if e.Desc.RangeID != o.Desc.RangeID {
panic(fmt.Sprintf("overlapping descriptors with same gen but different IDs: %s vs %s",
e.Desc, o.Desc))
}
return e.Lease.Sequence > o.Lease.Sequence
}
return e.Desc.Generation > o.Desc.Generation
}
This diff is collapsed.
......@@ -28,10 +28,9 @@ import (
)
type singleRangeInfo struct {
desc *roachpb.RangeDescriptor
rs roachpb.RSpan
ts hlc.Timestamp
token *EvictionToken
token EvictionToken
}
// RangeFeed divides a RangeFeed request on range boundaries and establishes a
......@@ -106,7 +105,6 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
nextRS.Key = partialRS.EndKey
select {
case rangeCh <- singleRangeInfo{
desc: desc,
rs: partialRS,
ts: ts,
token: ri.Token(),
......@@ -139,17 +137,18 @@ func (ds *DistSender) partialRangeFeed(
// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on a send failure, re-lookup.
if rangeInfo.desc == nil {
if rangeInfo.token.Empty() {
var err error
rangeInfo.desc, rangeInfo.token, err = ds.getDescriptor(ctx, rangeInfo.rs.Key, nil, false)
ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
continue
}
rangeInfo.token = ri
}
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.desc, eventCh)
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.token.Desc(), eventCh)
// Forward the timestamp in case we end up sending it again.
ts.Forward(maxTS)
......@@ -168,7 +167,7 @@ func (ds *DistSender) partialRangeFeed(
case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
rangeInfo.token.Evict(ctx)
rangeInfo.desc = nil
rangeInfo.token = EvictionToken{}
continue
case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)):
// Evict the descriptor from the cache.
......@@ -230,7 +229,7 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil /* leaseholder */)
if err != nil {
return args.Timestamp, err
}
......
This diff is collapsed.
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"runtime"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
var (
defaultShards = 2 * runtime.NumCPU()
)
// A LeaseHolderCache is a cache of replica descriptors keyed by range ID.
type LeaseHolderCache struct {
shards []LeaseHolderCacheShard
}
// A LeaseHolderCacheShard is a cache of replica descriptors keyed by range ID.
type LeaseHolderCacheShard struct {
// NB: This can't be a RWMutex for lookup because UnorderedCache.Get
// manipulates an internal LRU list.
mu syncutil.Mutex
cache *cache.UnorderedCache
}
// NewLeaseHolderCache creates a new leaseHolderCache of the given size.
// The underlying cache internally uses a hash map, so lookups
// are cheap.
func NewLeaseHolderCache(size func() int64) *LeaseHolderCache {
leaseholderCache := &LeaseHolderCache{}
leaseholderCache.shards = make([]LeaseHolderCacheShard, defaultShards)
for i := range leaseholderCache.shards {
val := &leaseholderCache.shards[i]
val.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(s int, key, value interface{}) bool {
return int64(s) > size()/int64(defaultShards)
},
})
}
return leaseholderCache
}
// Lookup returns the cached leader of the given range ID.
func (lc *LeaseHolderCache) Lookup(
ctx context.Context, rangeID roachpb.RangeID,
) (roachpb.StoreID, bool) {
ld := &lc.shards[int(rangeID)%len(lc.shards)]
ld.mu.Lock()
defer ld.mu.Unlock()
if v, ok := ld.cache.Get(rangeID); ok {
if log.V(2) {
log.Infof(ctx, "r%d: lookup leaseholder: %s", rangeID, v)
}
return v.(roachpb.StoreID), true
}
if log.V(2) {
log.Infof(ctx, "r%d: lookup leaseholder: not found", rangeID)
}
return 0, false
}
// Update invalidates the cached leader for the given range ID. If an empty
// replica descriptor is passed, the cached leader is evicted. Otherwise, the
// passed-in replica descriptor is cached.
func (lc *LeaseHolderCache) Update(
ctx context.Context, rangeID roachpb.RangeID, storeID roachpb.StoreID,
) {
ld := &lc.shards[int(rangeID)%len(lc.shards)]
ld.mu.Lock()
defer ld.mu.Unlock()
if storeID == 0 {
if log.V(2) {
log.Infof(ctx, "r%d: evicting leaseholder", rangeID)
}
ld.cache.Del(rangeID)
} else {
if log.V(2) {
log.Infof(ctx, "r%d: updating leaseholder: %d", rangeID, storeID)
}
ld.cache.Add(rangeID, storeID)
}
}
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"testing"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)
func staticSize(size int64) func() int64 {
return func() int64 {
return size
}
}
func TestLeaseHolderCache(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
cacheSize := (1 << 4) * defaultShards
lc := NewLeaseHolderCache(staticSize(int64(cacheSize)))
if repStoreID, ok := lc.Lookup(ctx, 12); ok {
t.Errorf("lookup of missing key returned: %d", repStoreID)
}
rangeID := roachpb.RangeID(5)
replicaStoreID := roachpb.StoreID(1)
lc.Update(ctx, rangeID, replicaStoreID)
if repStoreID, ok := lc.Lookup(ctx, rangeID); !ok {
t.Fatalf("expected StoreID %d", replicaStoreID)
} else if repStoreID != replicaStoreID {
t.Errorf("expected StoreID %d, got %d", replicaStoreID, repStoreID)
}
newReplicaStoreID := roachpb.StoreID(7)
lc.Update(ctx, rangeID, newReplicaStoreID)
if repStoreID, ok := lc.Lookup(ctx, rangeID); !ok {
t.Fatalf("expected StoreID %d", replicaStoreID)
} else if repStoreID != newReplicaStoreID {
t.Errorf("expected StoreID %d, got %d", newReplicaStoreID, repStoreID)
}
lc.Update(ctx, rangeID, roachpb.StoreID(0))
if repStoreID, ok := lc.Lookup(ctx, rangeID); ok {
t.Errorf("lookup of evicted key returned: %d", repStoreID)
}
for i := 10; i < 10+cacheSize+2; i++ {
lc.Update(ctx, roachpb.RangeID(i), replicaStoreID)
}
_, ok11 := lc.Lookup(ctx, 11)
_, ok12 := lc.Lookup(ctx, 12)
if ok11 || !ok12 {
t.Fatalf("unexpected policy used in cache : %v, %v", ok11, ok12)
}
}
func BenchmarkLeaseHolderCacheParallel(b *testing.B) {
defer leaktest.AfterTest(b)()
ctx := context.Background()
cacheSize := (1 << 4) * defaultShards
lc := NewLeaseHolderCache(staticSize(int64(cacheSize)))
numRanges := 2 * len(lc.shards)
for i := 1; i <= numRanges; i++ {
rangeID := roachpb.RangeID(i)
lc.Update(ctx, rangeID, roachpb.StoreID(i))
}
b.RunParallel(func(pb *testing.PB) {
var n int
for pb.Next() {
rangeID := roachpb.RangeID(n%numRanges + 1)
n++
if _, ok := lc.Lookup(ctx, rangeID); !ok {
b.Fatalf("r%d: should be found in the cache", rangeID)
}
}
})
}
This diff is collapsed.
This diff is collapsed.
......@@ -28,10 +28,10 @@ type RangeIterator struct {
ds *DistSender
scanDir ScanDirection
key roachpb.RKey
desc *roachpb.RangeDescriptor
token *EvictionToken
init bool
err error
// token represents the results of the latest cache lookup.
token EvictionToken
init bool
err error
}
// NewRangeIterator creates a new RangeIterator.
......@@ -62,16 +62,32 @@ func (ri *RangeIterator) Key() roachpb.RKey {
// Desc returns the descriptor of the range at which the iterator is
// currently positioned. The iterator must be valid.
//
// The returned descriptor is immutable.
func (ri *RangeIterator) Desc() *roachpb.RangeDescriptor {
if !ri.Valid() {
panic(ri.Error())
}
return ri.desc
return ri.token.Desc()
}
// Lease returns information about the lease of the range at which the iterator
// is currently positioned. The iterator must be valid.
//
// The lease information comes from a cache, and so it can be stale. Returns nil
// if no lease information is known.
//
// The returned lease is immutable.
func (ri *RangeIterator) Lease() *roachpb.Lease {
if !ri.Valid() {
panic(ri.Error())
}
return ri.token.Lease()
}
// Token returns the eviction token corresponding to the range
// descriptor for the current iteration. The iterator must be valid.
func (ri *RangeIterator) Token() *EvictionToken {
func (ri *RangeIterator) Token() EvictionToken {
if !ri.Valid() {
panic(ri.Error())
}
......@@ -89,9 +105,9 @@ func (ri *RangeIterator) NeedAnother(rs roachpb.RSpan) bool {
panic("NeedAnother() undefined for spans representing a single key")
}
if ri.scanDir == Ascending {
return ri.desc.EndKey.Less(rs.EndKey)
return ri.Desc().EndKey.Less(rs.EndKey)
}
return rs.Key.Less(ri.desc.StartKey)
return rs.Key.Less(ri.Desc().StartKey)
}
// Valid returns whether the iterator is valid. To be valid, the
......@@ -127,9 +143,9 @@ func (ri *RangeIterator) Next(ctx context.Context) {
}
// Determine next span when the current range is subtracted.
if ri.scanDir == Ascending {
ri.Seek(ctx, ri.desc.EndKey, ri.scanDir)
ri.Seek(ctx, ri.Desc().EndKey, ri.scanDir)
} else {
ri.Seek(ctx, ri.desc.StartKey, ri.scanDir)
ri.Seek(ctx, ri.Desc().StartKey, ri.scanDir)
}
}
......@@ -156,15 +172,13 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca
// Retry loop for looking up next range in the span. The retry loop
// deals with retryable range descriptor lookups.
for r := retry.StartWithCtx(ctx, ri.ds.rpcRetryOptions); r.Next(); {
var err error
ri.desc, ri.token, err = ri.ds.getDescriptor(
ctx, ri.key, ri.token, ri.scanDir == Descending)
rngInfo, err := ri.ds.getRoutingInfo(ctx, ri.key, ri.token, ri.scanDir == Descending)
if log.V(2) {
log.Infof(ctx, "key: %s, desc: %s err: %v", ri.key, ri.desc, err)
log.Infof(ctx, "key: %s, desc: %s err: %v", ri.key, rngInfo.Desc(), err)
}
// getDescriptor may fail retryably if, for example, the first
// getRoutingInfo may fail retryably if, for example, the first
// range isn't available via Gossip. Assume that all errors at
// this level are retryable. Non-retryable errors would be for
// things like malformed requests which we should have checked
......@@ -174,8 +188,9 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca
continue
}
ri.token = rngInfo
if log.V(2) {
log.Infof(ctx, "returning; key: %s, desc: %s", ri.key, ri.desc)
log.Infof(ctx, "returning; key: %s, desc: %s", ri.key, ri.Desc())
}
return
}
......
......@@ -43,15 +43,47 @@ type ReplicaSlice []ReplicaInfo
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossiped are omitted from the result.
//
// Generally, only voting replicas are returned. However, if a non-nil
// leaseholder is passed in, it will be included in the result even if the
// descriptor has it as a learner (we assert that the leaseholder is part of the
// descriptor). The idea is that the descriptor might be stale and list the
// leaseholder as a learner erroneously, and lease info is a strong signal in
// that direction. Note that the returned ReplicaSlice might still not include
// the leaseholder if info for the respective node is missing from the
// NodeDescStore.
//
// If there's no info in gossip for any of the nodes in the descriptor, a
// sendError is returned.
func NewReplicaSlice(
ctx context.Context, nodeDescs NodeDescStore, desc *roachpb.RangeDescriptor,
ctx context.Context,
nodeDescs NodeDescStore,
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
) (ReplicaSlice, error) {
if leaseholder != nil {
if _, ok := desc.GetReplicaDescriptorByID(leaseholder.ReplicaID); !ok {
log.Fatalf(ctx, "leaseholder not in descriptor; leaseholder: %s, desc: %s", leaseholder, desc)
}
}
// Learner replicas won't serve reads/writes, so we'll send only to the
// `Voters` replicas. This is just an optimization to save a network hop,
// everything would still work if we had `All` here.
voters := desc.Replicas().Voters()
// If we know a leaseholder, though, let's make sure we include it.
if leaseholder != nil && len(voters) < len(desc.Replicas().All()) {
found := false
for _, v := range voters {
if v == *leaseholder {
found = true
break
}
}
if !found {
log.Eventf(ctx, "the descriptor has the leaseholder as a learner; including it anyway")
voters = append(voters, *leaseholder)
}