Commit a102b141 authored by Nathan VanBenschoten's avatar Nathan VanBenschoten

config: fix GetLargestObjectID for pseudo object IDs

This was not behaving correctly, and was breaking things when a
pseudo table had the largest ID below MaxReservedDescID.
parent 01856bb7
......@@ -171,9 +171,10 @@ func (s *SystemConfig) GetIndex(key roachpb.Key) (int, bool) {
}
// GetLargestObjectID returns the largest object ID found in the config which is
// less than or equal to maxID. If maxID is 0, returns the largest ID in the
// config.
func (s *SystemConfig) GetLargestObjectID(maxID uint32) (uint32, error) {
// less than or equal to maxID. The objects in the config are augmented with the
// provided pseudo IDs. If maxID is 0, returns the largest ID in the config
// (again, augmented by the pseudo IDs).
func (s *SystemConfig) GetLargestObjectID(maxID uint32, pseudoIDs []uint32) (uint32, error) {
testingLock.Lock()
hook := testingLargestIDHook
testingLock.Unlock()
......@@ -190,28 +191,38 @@ func (s *SystemConfig) GetLargestObjectID(maxID uint32) (uint32, error) {
lowIndex := sort.Search(len(s.Values), func(i int) bool {
return bytes.Compare(s.Values[i].Key, lowBound) >= 0
})
if highIndex == lowIndex {
return 0, fmt.Errorf("descriptor table not found in system config of %d values", len(s.Values))
}
// Determine the largest pseudo table ID equal to or below maxID.
maxPseudoID := uint32(0)
for _, id := range pseudoIDs {
if id > maxPseudoID && (maxID == 0 || id <= maxID) {
maxPseudoID = id
}
}
// No maximum specified; maximum ID is the last entry in the descriptor
// table.
// table or the largest pseudo ID, whichever is larger.
if maxID == 0 {
id, err := keys.TODOSQLCodec.DecodeDescMetadataID(s.Values[highIndex-1].Key)
if err != nil {
return 0, err
}
return uint32(id), nil
if id < maxPseudoID {
id = maxPseudoID
}
return id, nil
}
// Maximum specified: need to search the descriptor table. Binary search
// through all descriptor table values to find the first descriptor with ID
// >= maxID.
// >= maxID and pick either it or maxPseudoID, whichever is larger.
searchSlice := s.Values[lowIndex:highIndex]
var err error
maxIdx := sort.Search(len(searchSlice), func(i int) bool {
var id uint64
var id uint32
id, err = keys.TODOSQLCodec.DecodeDescMetadataID(searchSlice[i].Key)
if err != nil {
return false
......@@ -243,7 +254,10 @@ func (s *SystemConfig) GetLargestObjectID(maxID uint32) (uint32, error) {
if err != nil {
return 0, err
}
return uint32(id), nil
if id < maxPseudoID {
id = maxPseudoID
}
return id, nil
}
// GetZoneConfigForKey looks up the zone config for the object (table
......@@ -487,7 +501,7 @@ func (s *SystemConfig) ComputeSplitKey(startKey, endKey roachpb.RKey) (rr roachp
// If the startKey falls within the non-system reserved range, compute those
// keys first.
if startID <= keys.MaxReservedDescID {
endID, err := s.GetLargestObjectID(keys.MaxReservedDescID)
endID, err := s.GetLargestObjectID(keys.MaxReservedDescID, keys.PseudoTableIDs)
if err != nil {
log.Errorf(context.TODO(), "unable to determine largest reserved object ID from system config: %s", err)
return nil
......@@ -499,7 +513,7 @@ func (s *SystemConfig) ComputeSplitKey(startKey, endKey roachpb.RKey) (rr roachp
}
// Find the split key in the user space.
endID, err := s.GetLargestObjectID(0)
endID, err := s.GetLargestObjectID(0, keys.PseudoTableIDs)
if err != nil {
log.Errorf(context.TODO(), "unable to determine largest object ID from system config: %s", err)
return nil
......
......@@ -126,28 +126,29 @@ func TestGetLargestID(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
values []roachpb.KeyValue
largest uint32
maxID uint32
errStr string
values []roachpb.KeyValue
largest uint32
maxID uint32
pseudoIDs []uint32
errStr string
}
testCases := []testCase{
// No data.
{nil, 0, 0, "descriptor table not found"},
{nil, 0, 0, nil, "descriptor table not found"},
// Some data, but not from the system span.
{[]roachpb.KeyValue{plainKV("a", "b")}, 0, 0, "descriptor table not found"},
{[]roachpb.KeyValue{plainKV("a", "b")}, 0, 0, nil, "descriptor table not found"},
// Some real data, but no descriptors.
{[]roachpb.KeyValue{
sqlKV(keys.NamespaceTableID, 1, 1),
sqlKV(keys.NamespaceTableID, 1, 2),
sqlKV(keys.UsersTableID, 1, 3),
}, 0, 0, "descriptor table not found"},
}, 0, 0, nil, "descriptor table not found"},
// Single correct descriptor entry.
{[]roachpb.KeyValue{sqlKV(keys.DescriptorTableID, 1, 1)}, 1, 0, ""},
{[]roachpb.KeyValue{sqlKV(keys.DescriptorTableID, 1, 1)}, 1, 0, nil, ""},
// Surrounded by other data.
{[]roachpb.KeyValue{
......@@ -155,7 +156,7 @@ func TestGetLargestID(t *testing.T) {
sqlKV(keys.NamespaceTableID, 1, 30),
sqlKV(keys.DescriptorTableID, 1, 8),
sqlKV(keys.ZonesTableID, 1, 40),
}, 8, 0, ""},
}, 8, 0, nil, ""},
// Descriptors with holes. Index ID does not matter.
{[]roachpb.KeyValue{
......@@ -163,7 +164,7 @@ func TestGetLargestID(t *testing.T) {
sqlKV(keys.DescriptorTableID, 2, 5),
sqlKV(keys.DescriptorTableID, 3, 8),
sqlKV(keys.DescriptorTableID, 4, 12),
}, 12, 0, ""},
}, 12, 0, nil, ""},
// Real SQL layout.
func() testCase {
......@@ -171,7 +172,8 @@ func TestGetLargestID(t *testing.T) {
descIDs := ms.DescriptorIDs()
maxDescID := descIDs[len(descIDs)-1]
kvs, _ /* splits */ := ms.GetInitialValues()
return testCase{kvs, uint32(maxDescID), 0, ""}
pseudoIDs := keys.PseudoTableIDs
return testCase{kvs, uint32(maxDescID), 0, pseudoIDs, ""}
}(),
// Test non-zero max.
......@@ -180,7 +182,7 @@ func TestGetLargestID(t *testing.T) {
sqlKV(keys.DescriptorTableID, 2, 5),
sqlKV(keys.DescriptorTableID, 3, 8),
sqlKV(keys.DescriptorTableID, 4, 12),
}, 8, 8, ""},
}, 8, 8, nil, ""},
// Test non-zero max.
{[]roachpb.KeyValue{
......@@ -188,13 +190,35 @@ func TestGetLargestID(t *testing.T) {
sqlKV(keys.DescriptorTableID, 2, 5),
sqlKV(keys.DescriptorTableID, 3, 8),
sqlKV(keys.DescriptorTableID, 4, 12),
}, 5, 7, ""},
}, 5, 7, nil, ""},
// Test pseudo ID (MetaRangesID = 16), exact.
{[]roachpb.KeyValue{
sqlKV(keys.DescriptorTableID, 1, 1),
sqlKV(keys.DescriptorTableID, 4, 12),
sqlKV(keys.DescriptorTableID, 4, 19),
sqlKV(keys.DescriptorTableID, 4, 22),
}, 16, 16, []uint32{16, 17, 18}, ""},
// Test pseudo ID (TimeseriesRangesID = 18), above.
{[]roachpb.KeyValue{
sqlKV(keys.DescriptorTableID, 1, 1),
sqlKV(keys.DescriptorTableID, 4, 12),
sqlKV(keys.DescriptorTableID, 4, 21),
sqlKV(keys.DescriptorTableID, 4, 22),
}, 18, 20, []uint32{16, 17, 18}, ""},
// Test pseudo ID (TimeseriesRangesID = 18), largest.
{[]roachpb.KeyValue{
sqlKV(keys.DescriptorTableID, 1, 1),
sqlKV(keys.DescriptorTableID, 4, 12),
}, 18, 0, []uint32{16, 17, 18}, ""},
}
cfg := config.NewSystemConfig(zonepb.DefaultZoneConfigRef())
for tcNum, tc := range testCases {
cfg.Values = tc.values
ret, err := cfg.GetLargestObjectID(tc.maxID)
ret, err := cfg.GetLargestObjectID(tc.maxID, tc.pseudoIDs)
if !testutils.IsError(err, tc.errStr) {
t.Errorf("#%d: expected err=%q, got %v", tcNum, tc.errStr, err)
continue
......
......@@ -217,7 +217,7 @@ func (d sqlDecoder) DecodeIndexPrefix(key roachpb.Key) ([]byte, uint32, uint32,
}
// DecodeDescMetadataID decodes a descriptor ID from a descriptor metadata key.
func (d sqlDecoder) DecodeDescMetadataID(key roachpb.Key) (uint64, error) {
func (d sqlDecoder) DecodeDescMetadataID(key roachpb.Key) (uint32, error) {
// Extract table and index ID from key.
remaining, tableID, _, err := d.DecodeIndexPrefix(key)
if err != nil {
......@@ -231,5 +231,5 @@ func (d sqlDecoder) DecodeDescMetadataID(key roachpb.Key) (uint64, error) {
if err != nil {
return 0, err
}
return id, nil
return uint32(id), nil
}
......@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
......@@ -389,7 +390,8 @@ func (v *constraintConformanceVisitor) reset(ctx context.Context) {
// zones that have constraints. Otherwise, just iterating through the ranges
// wouldn't create entries for constraints that aren't violated, and
// definitely not for zones that don't apply to any ranges.
maxObjectID, err := v.cfg.GetLargestObjectID(0 /* maxID - return the largest ID in the config */)
maxObjectID, err := v.cfg.GetLargestObjectID(
0 /* maxID - return the largest ID in the config */, keys.PseudoTableIDs)
if err != nil {
log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err)
}
......
......@@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
......@@ -298,7 +299,9 @@ func (v *replicationStatsVisitor) reset(ctx context.Context) {
// Iterate through all the zone configs to create report entries for all the
// zones that have constraints. Otherwise, just iterating through the ranges
// wouldn't create entries for zones that don't apply to any ranges.
maxObjectID, err := v.cfg.GetLargestObjectID(0 /* maxID - return the largest ID in the config */)
maxObjectID, err := v.cfg.GetLargestObjectID(
0 /* maxID - return the largest ID in the config */, keys.PseudoTableIDs,
)
if err != nil {
log.Fatalf(ctx, "unexpected failure to compute max object id: %s", err)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment