Commit f2ec747c authored by Andrei Matei's avatar Andrei Matei

kvclient: switch DistSender to use the unified cache

This patch finishes the transition to the combined range descriptor /
leaseholder cache. The DistSender now queries and updates the range
cache for leases. This patch deletes the LeaseholderCache.

The leaseholder cache never made sense as a separate cache from the
range descriptor cache, since the two need to be coherent. Having a
leaseholder that's not part of the range descriptor cache is an awkward
situation for the code to handle. Remnants of this awkwardness remain
even with this patch (when a replica returns a NotLeaseHolderError
pointing to a replica not in the cached descriptor), but at least we
have a common data structure to address the issue in following patches.
Information on descriptor and the leaseholder are also always needed in
tandem, so the separate datastructures were unnatural for all the
consumers.

Release note: None
parent fd5f9b84
This diff is collapsed.
......@@ -28,7 +28,6 @@ import (
)
type singleRangeInfo struct {
desc *roachpb.RangeDescriptor
rs roachpb.RSpan
ts hlc.Timestamp
token EvictionToken
......@@ -106,7 +105,6 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
nextRS.Key = partialRS.EndKey
select {
case rangeCh <- singleRangeInfo{
desc: desc,
rs: partialRS,
ts: ts,
token: ri.Token(),
......@@ -139,19 +137,18 @@ func (ds *DistSender) partialRangeFeed(
// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on a send failure, re-lookup.
if rangeInfo.desc == nil {
if rangeInfo.token.Empty() {
var err error
ri, tok, err := ds.getDescriptor(ctx, rangeInfo.rs.Key, EvictionToken{}, false)
ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
continue
}
rangeInfo.desc = &ri.Desc
rangeInfo.token = tok
rangeInfo.token = ri
}
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.desc, eventCh)
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.token.Desc(), eventCh)
// Forward the timestamp in case we end up sending it again.
ts.Forward(maxTS)
......@@ -170,7 +167,7 @@ func (ds *DistSender) partialRangeFeed(
case errors.HasType(err, (*sendError)(nil)), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
rangeInfo.token.Evict(ctx)
rangeInfo.desc = nil
rangeInfo.token = EvictionToken{}
continue
case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)):
// Evict the descriptor from the cache.
......@@ -232,7 +229,7 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil /* leaseholder */)
if err != nil {
return args.Timestamp, err
}
......
This diff is collapsed.
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"runtime"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
var (
defaultShards = 2 * runtime.NumCPU()
)
// A LeaseHolderCache is a cache of replica descriptors keyed by range ID.
type LeaseHolderCache struct {
shards []LeaseHolderCacheShard
}
// A LeaseHolderCacheShard is a cache of replica descriptors keyed by range ID.
type LeaseHolderCacheShard struct {
// NB: This can't be a RWMutex for lookup because UnorderedCache.Get
// manipulates an internal LRU list.
mu syncutil.Mutex
cache *cache.UnorderedCache
}
// NewLeaseHolderCache creates a new leaseHolderCache of the given size.
// The underlying cache internally uses a hash map, so lookups
// are cheap.
func NewLeaseHolderCache(size func() int64) *LeaseHolderCache {
leaseholderCache := &LeaseHolderCache{}
leaseholderCache.shards = make([]LeaseHolderCacheShard, defaultShards)
for i := range leaseholderCache.shards {
val := &leaseholderCache.shards[i]
val.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(s int, key, value interface{}) bool {
return int64(s) > size()/int64(defaultShards)
},
})
}
return leaseholderCache
}
// Lookup returns the cached leader of the given range ID.
func (lc *LeaseHolderCache) Lookup(
ctx context.Context, rangeID roachpb.RangeID,
) (roachpb.StoreID, bool) {
ld := &lc.shards[int(rangeID)%len(lc.shards)]
ld.mu.Lock()
defer ld.mu.Unlock()
if v, ok := ld.cache.Get(rangeID); ok {
if log.V(2) {
log.Infof(ctx, "r%d: lookup leaseholder: %s", rangeID, v)
}
return v.(roachpb.StoreID), true
}
if log.V(2) {
log.Infof(ctx, "r%d: lookup leaseholder: not found", rangeID)
}
return 0, false
}
// Update invalidates the cached leader for the given range ID. If an empty
// replica descriptor is passed, the cached leader is evicted. Otherwise, the
// passed-in replica descriptor is cached.
func (lc *LeaseHolderCache) Update(
ctx context.Context, rangeID roachpb.RangeID, storeID roachpb.StoreID,
) {
ld := &lc.shards[int(rangeID)%len(lc.shards)]
ld.mu.Lock()
defer ld.mu.Unlock()
if storeID == 0 {
if log.V(2) {
log.Infof(ctx, "r%d: evicting leaseholder", rangeID)
}
ld.cache.Del(rangeID)
} else {
if log.V(2) {
log.Infof(ctx, "r%d: updating leaseholder: %d", rangeID, storeID)
}
ld.cache.Add(rangeID, storeID)
}
}
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"testing"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)
func staticSize(size int64) func() int64 {
return func() int64 {
return size
}
}
func TestLeaseHolderCache(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
cacheSize := (1 << 4) * defaultShards
lc := NewLeaseHolderCache(staticSize(int64(cacheSize)))
if repStoreID, ok := lc.Lookup(ctx, 12); ok {
t.Errorf("lookup of missing key returned: %d", repStoreID)
}
rangeID := roachpb.RangeID(5)
replicaStoreID := roachpb.StoreID(1)
lc.Update(ctx, rangeID, replicaStoreID)
if repStoreID, ok := lc.Lookup(ctx, rangeID); !ok {
t.Fatalf("expected StoreID %d", replicaStoreID)
} else if repStoreID != replicaStoreID {
t.Errorf("expected StoreID %d, got %d", replicaStoreID, repStoreID)
}
newReplicaStoreID := roachpb.StoreID(7)
lc.Update(ctx, rangeID, newReplicaStoreID)
if repStoreID, ok := lc.Lookup(ctx, rangeID); !ok {
t.Fatalf("expected StoreID %d", replicaStoreID)
} else if repStoreID != newReplicaStoreID {
t.Errorf("expected StoreID %d, got %d", newReplicaStoreID, repStoreID)
}
lc.Update(ctx, rangeID, roachpb.StoreID(0))
if repStoreID, ok := lc.Lookup(ctx, rangeID); ok {
t.Errorf("lookup of evicted key returned: %d", repStoreID)
}
for i := 10; i < 10+cacheSize+2; i++ {
lc.Update(ctx, roachpb.RangeID(i), replicaStoreID)
}
_, ok11 := lc.Lookup(ctx, 11)
_, ok12 := lc.Lookup(ctx, 12)
if ok11 || !ok12 {
t.Fatalf("unexpected policy used in cache : %v, %v", ok11, ok12)
}
}
func BenchmarkLeaseHolderCacheParallel(b *testing.B) {
defer leaktest.AfterTest(b)()
ctx := context.Background()
cacheSize := (1 << 4) * defaultShards
lc := NewLeaseHolderCache(staticSize(int64(cacheSize)))
numRanges := 2 * len(lc.shards)
for i := 1; i <= numRanges; i++ {
rangeID := roachpb.RangeID(i)
lc.Update(ctx, rangeID, roachpb.StoreID(i))
}
b.RunParallel(func(pb *testing.PB) {
var n int
for pb.Next() {
rangeID := roachpb.RangeID(n%numRanges + 1)
n++
if _, ok := lc.Lookup(ctx, rangeID); !ok {
b.Fatalf("r%d: should be found in the cache", rangeID)
}
}
})
}
......@@ -235,6 +235,12 @@ func newTestDescriptorDB() *testDescriptorDB {
return db
}
func staticSize(size int64) func() int64 {
return func() int64 {
return size
}
}
func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
st := cluster.MakeTestingClusterSettings()
db := newTestDescriptorDB()
......
......@@ -172,13 +172,13 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca
// Retry loop for looking up next range in the span. The retry loop
// deals with retryable range descriptor lookups.
for r := retry.StartWithCtx(ctx, ri.ds.rpcRetryOptions); r.Next(); {
_, tok, err := ri.ds.getDescriptor(ctx, ri.key, ri.token, ri.scanDir == Descending)
rngInfo, err := ri.ds.getRoutingInfo(ctx, ri.key, ri.token, ri.scanDir == Descending)
if log.V(2) {
log.Infof(ctx, "key: %s, desc: %s err: %v", ri.key, ri.Desc(), err)
log.Infof(ctx, "key: %s, desc: %s err: %v", ri.key, rngInfo.Desc(), err)
}
// getDescriptor may fail retryably if, for example, the first
// getRoutingInfo may fail retryably if, for example, the first
// range isn't available via Gossip. Assume that all errors at
// this level are retryable. Non-retryable errors would be for
// things like malformed requests which we should have checked
......@@ -188,7 +188,7 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca
continue
}
ri.token = tok
ri.token = rngInfo
if log.V(2) {
log.Infof(ctx, "returning; key: %s, desc: %s", ri.key, ri.Desc())
}
......
......@@ -43,15 +43,47 @@ type ReplicaSlice []ReplicaInfo
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossiped are omitted from the result.
//
// Generally, only voting replicas are returned. However, if a non-nil
// leaseholder is passed in, it will be included in the result even if the
// descriptor has it as a learner (we assert that the leaseholder is part of the
// descriptor). The idea is that the descriptor might be stale and list the
// leaseholder as a learner erroneously, and lease info is a strong signal in
// that direction. Note that the returned ReplicaSlice might still not include
// the leaseholder if info for the respective node is missing from the
// NodeDescStore.
//
// If there's no info in gossip for any of the nodes in the descriptor, a
// sendError is returned.
func NewReplicaSlice(
ctx context.Context, nodeDescs NodeDescStore, desc *roachpb.RangeDescriptor,
ctx context.Context,
nodeDescs NodeDescStore,
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
) (ReplicaSlice, error) {
if leaseholder != nil {
if _, ok := desc.GetReplicaDescriptorByID(leaseholder.ReplicaID); !ok {
log.Fatalf(ctx, "leaseholder not in descriptor; leaseholder: %s, desc: %s", leaseholder, desc)
}
}
// Learner replicas won't serve reads/writes, so we'll send only to the
// `Voters` replicas. This is just an optimization to save a network hop,
// everything would still work if we had `All` here.
voters := desc.Replicas().Voters()
// If we know a leaseholder, though, let's make sure we include it.
if leaseholder != nil && len(voters) < len(desc.Replicas().All()) {
found := false
for _, v := range voters {
if v == *leaseholder {
found = true
break
}
}
if !found {
log.Eventf(ctx, "the descriptor has the leaseholder as a learner; including it anyway")
voters = append(voters, *leaseholder)
}
}
rs := make(ReplicaSlice, 0, len(voters))
for _, r := range voters {
nd, err := nodeDescs.GetNodeDescriptor(r.NodeID)
......@@ -82,11 +114,10 @@ func (rs ReplicaSlice) Len() int { return len(rs) }
// Swap swaps the replicas with indexes i and j.
func (rs ReplicaSlice) Swap(i, j int) { rs[i], rs[j] = rs[j], rs[i] }
// FindReplica returns the index of the replica which matches the specified store
// ID. If no replica matches, -1 is returned.
func (rs ReplicaSlice) FindReplica(storeID roachpb.StoreID) int {
// Find returns the index of the specified ReplicaID, or -1 if missing.
func (rs ReplicaSlice) Find(id roachpb.ReplicaID) int {
for i := range rs {
if rs[i].StoreID == storeID {
if rs[i].ReplicaID == id {
return i
}
}
......
......@@ -11,6 +11,7 @@
package kvcoord
import (
"context"
"fmt"
"reflect"
"strings"
......@@ -20,8 +21,71 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
type mockNodeStore struct {
nodes []roachpb.NodeDescriptor
}
var _ NodeDescStore = &mockNodeStore{}
// GetNodeDesc is part of the NodeDescStore interface.
func (ns *mockNodeStore) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
for _, nd := range ns.nodes {
if nd.NodeID == nodeID {
return &nd, nil
}
}
return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID)
}
func TestNewReplicaSlice(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
rd := &roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
},
}
ns := &mockNodeStore{
nodes: []roachpb.NodeDescriptor{
{
NodeID: 1,
Address: util.UnresolvedAddr{},
},
{
NodeID: 2,
Address: util.UnresolvedAddr{},
},
{
NodeID: 3,
Address: util.UnresolvedAddr{},
},
},
}
rs, err := NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */)
require.NoError(t, err)
require.Equal(t, 3, rs.Len())
// Check that learners are not included.
typLearner := roachpb.LEARNER
rd.InternalReplicas[2].Type = &typLearner
rs, err = NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */)
require.NoError(t, err)
require.Equal(t, 2, rs.Len())
// Check that, if the leasehoder points to a learner, that learner is
// included.
leaseholder := &roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3}
rs, err = NewReplicaSlice(ctx, ns, rd, leaseholder)
require.NoError(t, err)
require.Equal(t, 3, rs.Len())
}
func getStores(rs ReplicaSlice) (r []roachpb.StoreID) {
for i := range rs {
r = append(r, rs[i].StoreID)
......
......@@ -270,6 +270,8 @@ func sendBatch(
g := makeGossip(t, stopper, rpcContext)
desc := new(roachpb.RangeDescriptor)
desc.StartKey = roachpb.RKeyMin
desc.EndKey = roachpb.RKeyMax
for i, addr := range addrs {
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i + 1),
......@@ -297,6 +299,12 @@ func sendBatch(
TransportFactory: transportFactory,
},
})
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: *desc,
Lease: roachpb.Lease{},
})
routing, err := ds.getRoutingInfo(ctx, desc.StartKey, EvictionToken{}, false /* useReverseScan */)
require.NoError(t, err)
return ds.sendToReplicas(ctx, roachpb.BatchRequest{}, desc, false /* withCommit */)
return ds.sendToReplicas(ctx, roachpb.BatchRequest{}, routing, false /* withCommit */)
}
......@@ -250,7 +250,7 @@ func TestPlanningDuringSplitsAndMerges(t *testing.T) {
}
// Test that DistSQLReceiver uses inbound metadata to update the
// RangeDescriptorCache and the LeaseHolderCache.
// RangeDescriptorCache.
func TestDistSQLReceiverUpdatesCaches(t *testing.T) {
defer leaktest.AfterTest(t)()
......@@ -511,7 +511,6 @@ func TestDistSQLDeadHosts(t *testing.T) {
func TestDistSQLDrainingHosts(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("!!! skip until following commit that switches DistSender to update the combined cache")
const numNodes = 2
tc := serverutils.StartTestCluster(
......
......@@ -653,7 +653,8 @@ func TestEvalCtxTxnOnRemoteNodes(t *testing.T) {
require.NoError(t, err)
// Query again just in case the previous query executed on the gateway
// because the leaseholder cache wasn't populated and we fooled ourselves.
// because the we didn't have a leaseholder in the cache and we fooled
// ourselves.
_, err = db.Exec("SELECT cluster_logical_timestamp() FROM t")
require.NoError(t, err)
})
......
......@@ -255,7 +255,7 @@ func (o *binPackingOracle) ChoosePreferredReplica(
func replicaSliceOrErr(
ctx context.Context, nodeDescs kvcoord.NodeDescStore, desc *roachpb.RangeDescriptor,
) (kvcoord.ReplicaSlice, error) {
replicas, err := kvcoord.NewReplicaSlice(ctx, nodeDescs, desc)
replicas, err := kvcoord.NewReplicaSlice(ctx, nodeDescs, desc, nil /* leaseholder */)
if err != nil {
return kvcoord.ReplicaSlice{}, sqlbase.NewRangeUnavailableError(desc.RangeID, err)
}
......
......@@ -102,8 +102,8 @@ func TestSpanResolverUsesCaches(t *testing.T) {
})
}
// Resolve the spans. Since the LeaseHolderCache is empty, all the ranges
// should be grouped and "assigned" to replica 0.
// Resolve the spans. Since the range descriptor cache doesn't have any
// leases, all the ranges should be grouped and "assigned" to replica 0.
replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil), spans...)
if err != nil {
t.Fatal(err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment