Commit 61f2787d authored by craig[bot]'s avatar craig[bot]

Merge #50755 #50782

50755: storage,ccl: Plumbs session user information to the ExternalStorage r=miretskiy a=adityamaru

The new UserFileTableSytem (#50493) needs knowledge of the user
interacting with it to check access privileges on the underlying user
scoped tables.  These interactions are most notably via IMPORT, BACKUP
and CHANGEFEED jobs and processors.

This change plumbs the session user string from these various entities
via the factory methods used to create the ExternalStorage objects.  It
also passes an internal executor and kvdb from server.Start(), which
are required by the FileTableSystem to execute its user scoped SQL
queries.

This PR will be followed up by the implementation of the FileTable
ExternalStorage, which will supply the user string, internal executor
and kvDB to the UserFileTableSystem.

Informs: #47211

Release note: None

50782: geo: minor fix ups to ST_Summary r=sumeerbhola a=otan

Minor fixes to ST_Summary to make it more PostGIS like:
* Correct ordering of the symbols.
* Fix bounding boxes not being reported in sub structures.

Release note: None
Co-authored-by: default avatarAditya Maru <[email protected]>
Co-authored-by: default avatarOliver Tan <[email protected]>
......@@ -430,7 +430,7 @@ func (b *backupResumer) Resume(
}
// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(details.URI)
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(details.URI, p.User())
if err != nil {
return errors.Wrapf(err, "export configuration")
}
......@@ -440,7 +440,7 @@ func (b *backupResumer) Resume(
}
storageByLocalityKV := make(map[string]*roachpb.ExternalStorage)
for kv, uri := range details.URIsByLocalityKV {
conf, err := cloudimpl.ExternalStorageConfFromURI(uri)
conf, err := cloudimpl.ExternalStorageConfFromURI(uri, p.User())
if err != nil {
return err
}
......@@ -497,7 +497,7 @@ func (b *backupResumer) Resume(
if err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
}
b.deleteCheckpoint(ctx, p.ExecCfg())
b.deleteCheckpoint(ctx, p.ExecCfg(), p.User())
if ptsID != nil && !b.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
......@@ -566,20 +566,23 @@ func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err
telemetry.CountBucketed("backup.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()))
cfg := phs.(sql.PlanHookState).ExecCfg()
b.deleteCheckpoint(ctx, cfg)
p := phs.(sql.PlanHookState)
cfg := p.ExecCfg()
b.deleteCheckpoint(ctx, cfg, p.User())
return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return b.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider)
})
}
func (b *backupResumer) deleteCheckpoint(ctx context.Context, cfg *sql.ExecutorConfig) {
func (b *backupResumer) deleteCheckpoint(
ctx context.Context, cfg *sql.ExecutorConfig, user string,
) {
// Attempt to delete BACKUP-CHECKPOINT.
if err := func() error {
details := b.job.Details().(jobspb.BackupDetails)
// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
conf, err := cloudimpl.ExternalStorageConfFromURI(details.URI)
conf, err := cloudimpl.ExternalStorageConfFromURI(details.URI, user)
if err != nil {
return err
}
......
......@@ -383,7 +383,7 @@ func backupPlanHook(
if err != nil {
return err
}
defaultStore, err := makeCloudStorage(ctx, defaultURI)
defaultStore, err := makeCloudStorage(ctx, defaultURI, p.User())
if err != nil {
return err
}
......@@ -399,7 +399,7 @@ func backupPlanHook(
g := ctxgroup.WithContext(ctx)
if len(incrementalFrom) > 0 {
if encryptionPassphrase != nil {
exportStore, err := makeCloudStorage(ctx, incrementalFrom[0])
exportStore, err := makeCloudStorage(ctx, incrementalFrom[0], p.User())
if err != nil {
return err
}
......@@ -424,7 +424,7 @@ func backupPlanHook(
// descriptors around.
uri := incrementalFrom[i]
desc, err := ReadBackupManifestFromURI(
ctx, uri, makeCloudStorage, encryption,
ctx, uri, p.User(), makeCloudStorage, encryption,
)
if err != nil {
return errors.Wrapf(err, "failed to read backup from %q", uri)
......@@ -494,7 +494,7 @@ func backupPlanHook(
// Close the old store before overwriting the reference with the new
// subdir store.
defaultStore.Close()
defaultStore, err = makeCloudStorage(ctx, defaultURI)
defaultStore, err = makeCloudStorage(ctx, defaultURI, p.User())
if err != nil {
return errors.Wrap(err, "re-opening layer-specific destination location")
}
......@@ -555,6 +555,7 @@ func backupPlanHook(
prevBackups,
nil, /*backupLocalityInfo*/
keys.MinKey,
p.User(),
func(span covering.Range, start, end hlc.Timestamp) error {
if (start == hlc.Timestamp{}) {
newSpans = append(newSpans, roachpb.Span{Key: span.Start, EndKey: span.End})
......@@ -608,6 +609,7 @@ func backupPlanHook(
append(prevBackups, backupManifest),
nil, /*backupLocalityInfo*/
keys.MinKey,
p.User(),
errOnMissingRange,
); err != nil {
return err
......@@ -632,7 +634,7 @@ func backupPlanHook(
if err != nil {
return err
}
exportStore, err := makeCloudStorage(ctx, defaultURI)
exportStore, err := makeCloudStorage(ctx, defaultURI, p.User())
if err != nil {
return err
}
......
......@@ -114,7 +114,7 @@ func runBackupProcessor(
// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(spec.DefaultURI)
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(spec.DefaultURI, spec.User)
if err != nil {
return errors.Wrapf(err, "export configuration")
}
......@@ -124,7 +124,7 @@ func runBackupProcessor(
}
storageByLocalityKV := make(map[string]*roachpb.ExternalStorage)
for kv, uri := range spec.URIsByLocalityKV {
conf, err := cloudimpl.ExternalStorageConfFromURI(uri)
conf, err := cloudimpl.ExternalStorageConfFromURI(uri, spec.User)
if err != nil {
return err
}
......
......@@ -60,6 +60,7 @@ func distBackup(
mvccFilter,
encryption,
startTime, endTime,
phs.User(),
)
if err != nil {
return err
......@@ -132,6 +133,7 @@ func makeBackupDataProcessorSpecs(
mvccFilter roachpb.MVCCFilter,
encryption *roachpb.FileEncryptionOptions,
startTime, endTime hlc.Timestamp,
user string,
) (map[roachpb.NodeID]*execinfrapb.BackupDataSpec, error) {
var spanPartitions []sql.SpanPartition
var introducedSpanPartitions []sql.SpanPartition
......@@ -162,6 +164,7 @@ func makeBackupDataProcessorSpecs(
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
User: user,
}
nodeToSpec[partition.Node] = spec
}
......@@ -182,6 +185,7 @@ func makeBackupDataProcessorSpecs(
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
User: user,
}
nodeToSpec[partition.Node] = spec
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/importccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
......@@ -111,7 +112,8 @@ func BenchmarkLoadRestore(b *testing.B) {
b.SetBytes(int64(buf.Len() / b.N))
ts := hlc.Timestamp{WallTime: hlc.UnixNano()}
b.ResetTimer()
if _, err := importccl.Load(ctx, sqlDB.DB.(*gosql.DB), buf, "data", dir, ts, 0, dir, dir); err != nil {
if _, err := importccl.Load(ctx, sqlDB.DB.(*gosql.DB), buf, "data", dir, ts,
0, dir, dir, security.RootUser); err != nil {
b.Fatalf("%+v", err)
}
sqlDB.Exec(b, fmt.Sprintf(`RESTORE data.* FROM '%s'`, dir))
......
......@@ -74,10 +74,11 @@ func (r BackupFileDescriptors) Less(i, j int) bool {
func ReadBackupManifestFromURI(
ctx context.Context,
uri string,
user string,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
encryption *roachpb.FileEncryptionOptions,
) (BackupManifest, error) {
exportStore, err := makeExternalStorageFromURI(ctx, uri)
exportStore, err := makeExternalStorageFromURI(ctx, uri, user)
if err != nil {
return BackupManifest{}, err
......@@ -349,13 +350,14 @@ func writeTableStatistics(
func loadBackupManifests(
ctx context.Context,
uris []string,
user string,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
encryption *roachpb.FileEncryptionOptions,
) ([]BackupManifest, error) {
backupManifests := make([]BackupManifest, len(uris))
for i, uri := range uris {
desc, err := ReadBackupManifestFromURI(ctx, uri, makeExternalStorageFromURI, encryption)
desc, err := ReadBackupManifestFromURI(ctx, uri, user, makeExternalStorageFromURI, encryption)
if err != nil {
return nil, errors.Wrapf(err, "failed to read backup descriptor")
}
......@@ -442,6 +444,7 @@ func resolveBackupManifests(
from [][]string,
endTime hlc.Timestamp,
encryption *roachpb.FileEncryptionOptions,
user string,
) (
defaultURIs []string,
mainBackupManifests []BackupManifest,
......@@ -466,7 +469,7 @@ func resolveBackupManifests(
stores := make([]cloud.ExternalStorage, len(uris))
for j := range uris {
stores[j], err = mkStore(ctx, uris[j])
stores[j], err = mkStore(ctx, uris[j], user)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "export configuration")
}
......
......@@ -122,6 +122,7 @@ func makeImportSpans(
backups []BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
lowWaterMark roachpb.Key,
user string,
onMissing func(span covering.Range, start, end hlc.Timestamp) error,
) ([]importEntry, hlc.Timestamp, error) {
// Put the covering for the already-completed spans into the
......@@ -190,7 +191,7 @@ func makeImportSpans(
if backupLocalityInfo != nil && backupLocalityInfo[i].URIsByOriginalLocalityKV != nil {
storesByLocalityKV = make(map[string]roachpb.ExternalStorage)
for kv, uri := range backupLocalityInfo[i].URIsByOriginalLocalityKV {
conf, err := cloudimpl.ExternalStorageConfFromURI(uri)
conf, err := cloudimpl.ExternalStorageConfFromURI(uri, user)
if err != nil {
return nil, hlc.Timestamp{}, err
}
......@@ -592,6 +593,7 @@ func restore(
spans []roachpb.Span,
job *jobs.Job,
encryption *roachpb.FileEncryptionOptions,
user string,
) (RowCount, error) {
// A note about contexts and spans in this method: the top-level context
// `restoreCtx` is used for orchestration logging. All operations that carry
......@@ -628,7 +630,8 @@ func restore(
// Pivot the backups, which are grouped by time, into requests for import,
// which are grouped by keyrange.
highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater
importSpans, _, err := makeImportSpans(spans, backupManifests, backupLocalityInfo, highWaterMark, errOnMissingRange)
importSpans, _, err := makeImportSpans(spans, backupManifests, backupLocalityInfo,
highWaterMark, user, errOnMissingRange)
if err != nil {
return mu.res, errors.Wrapf(err, "making import requests for %d backups", len(backupManifests))
}
......@@ -788,7 +791,8 @@ func loadBackupSQLDescs(
details jobspb.RestoreDetails,
encryption *roachpb.FileEncryptionOptions,
) ([]BackupManifest, BackupManifest, []sqlbase.Descriptor, error) {
backupManifests, err := loadBackupManifests(ctx, details.URIs, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption)
backupManifests, err := loadBackupManifests(ctx, details.URIs,
p.User(), p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption)
if err != nil {
return nil, BackupManifest{}, nil, err
}
......@@ -1048,7 +1052,7 @@ func (r *restoreResumer) Resume(
if err != nil {
return err
}
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(details.URIs[lastBackupIndex])
defaultConf, err := cloudimpl.ExternalStorageConfFromURI(details.URIs[lastBackupIndex], p.User())
if err != nil {
return errors.Wrapf(err, "creating external store configuration")
}
......@@ -1097,6 +1101,7 @@ func (r *restoreResumer) Resume(
spans,
r.job,
details.Encryption,
p.User(),
)
if err != nil {
return err
......
......@@ -924,7 +924,7 @@ func doRestorePlan(
}
baseStores := make([]cloud.ExternalStorage, len(from[0]))
for i := range from[0] {
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, from[0][i])
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, from[0][i], p.User())
if err != nil {
return errors.Wrapf(err, "failed to open backup storage location")
}
......@@ -944,6 +944,7 @@ func doRestorePlan(
defaultURIs, mainBackupManifests, localityInfo, err := resolveBackupManifests(
ctx, baseStores, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, from, endTime, encryption,
p.User(),
)
if err != nil {
return err
......
......@@ -86,7 +86,7 @@ func showBackupPlanHook(
return err
}
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, str)
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, str, p.User())
if err != nil {
return errors.Wrapf(err, "make storage")
}
......
......@@ -143,6 +143,7 @@ func distChangefeedFlow(
corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
User: phs.User(),
}
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
......@@ -153,6 +154,7 @@ func distChangefeedFlow(
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
User: phs.User(),
}
p := sql.MakePhysicalPlan(gatewayNodeID)
......
......@@ -165,7 +165,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
if ca.sink, err = getSink(
ctx, ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI,
ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User,
); err != nil {
err = MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
......@@ -554,7 +554,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
var nilOracle timestampLowerBoundOracle
if cf.sink, err = getSink(
ctx, cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets,
cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI,
cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI, cf.spec.User,
); err != nil {
err = MarkRetryableError(err)
cf.MoveToDraining(err)
......
......@@ -261,7 +261,7 @@ func changefeedPlanHook(
var nilOracle timestampLowerBoundOracle
canarySink, err := getSink(
ctx, details.SinkURI, nodeID, details.Opts, details.Targets,
settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User(),
)
if err != nil {
return MaybeStripRetryableErrorMarker(err)
......
......@@ -77,6 +77,7 @@ func getSink(
settings *cluster.Settings,
timestampOracle timestampLowerBoundOracle,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user string,
) (Sink, error) {
u, err := url.Parse(sinkURI)
if err != nil {
......@@ -192,7 +193,7 @@ func getSink(
makeSink = func() (Sink, error) {
return makeCloudStorageSink(
ctx, u.String(), nodeID, fileSize, settings,
opts, timestampOracle, makeExternalStorageFromURI,
opts, timestampOracle, makeExternalStorageFromURI, user,
)
}
case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL:
......
......@@ -305,6 +305,7 @@ func makeCloudStorageSink(
opts map[string]string,
timestampOracle timestampLowerBoundOracle,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user string,
) (Sink, error) {
// Date partitioning is pretty standard, so no override for now, but we could
// plumb one down if someone needs it.
......@@ -362,7 +363,7 @@ func makeCloudStorageSink(
}
var err error
if s.es, err = makeExternalStorageFromURI(ctx, baseURI); err != nil {
if s.es, err = makeExternalStorageFromURI(ctx, baseURI, user); err != nil {
return nil, err
}
......
......@@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
......@@ -98,10 +99,14 @@ func TestCloudStorageSink(t *testing.T) {
require.NoError(t, err)
clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir)
externalStorageFromURI := func(ctx context.Context, uri string) (cloud.ExternalStorage, error) {
return cloudimpl.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, clientFactory)
externalStorageFromURI := func(ctx context.Context, uri, user string) (cloud.ExternalStorage,
error) {
return cloudimpl.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory, user, nil, nil)
}
user := security.RootUser
t.Run(`golden`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
......@@ -110,7 +115,7 @@ func TestCloudStorageSink(t *testing.T) {
sinkDir := `golden`
s, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+sinkDir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.
......@@ -147,7 +152,7 @@ func TestCloudStorageSink(t *testing.T) {
dir := `single-node` + compression
s, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.
......@@ -222,12 +227,12 @@ func TestCloudStorageSink(t *testing.T) {
dir := `multi-node`
s1, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s2, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 2, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
// Hack into the sinks to pretend each is the first sink created on two
......@@ -256,12 +261,12 @@ func TestCloudStorageSink(t *testing.T) {
// this is unavoidable.
s1R, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s2R, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 2, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
// Nodes restart. s1 gets the same sink id it had last time but s2
......@@ -302,14 +307,14 @@ func TestCloudStorageSink(t *testing.T) {
dir := `zombie`
s1, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.
s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID.
s2, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s2.(*cloudStorageSink).sinkID = 8 // Force a deterministic sinkID.
......@@ -344,7 +349,7 @@ func TestCloudStorageSink(t *testing.T) {
const targetMaxFileSize = 6
s, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, targetMaxFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.
......@@ -431,7 +436,7 @@ func TestCloudStorageSink(t *testing.T) {
dir := `file-ordering`
s, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
settings, opts, timestampOracle, externalStorageFromURI, user,
)
require.NoError(t, err)
......@@ -490,7 +495,7 @@ func TestCloudStorageSink(t *testing.T) {
dir := `ordering-among-schema-versions`
var targetMaxFileSize int64 = 10
s, err := makeCloudStorageSink(ctx, `nodelocal://0/`+dir, 1, targetMaxFileSize, settings,
opts, timestampOracle, externalStorageFromURI)
opts, timestampOracle, externalStorageFromURI, user)
require.NoError(t, err)
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1)))
......
......@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/cli"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
......@@ -59,15 +60,17 @@ func runLoadShow(cmd *cobra.Command, args []string) error {
basepath = cloudimpl.MakeLocalStorageURI(basepath)
}
externalStorageFromURI := func(ctx context.Context, uri string) (cloud.ExternalStorage, error) {
externalStorageFromURI := func(ctx context.Context, uri string,
user string) (cloud.ExternalStorage, error) {
return cloudimpl.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{},
cluster.NoSettings, blobs.TestEmptyBlobClientFactory)
cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user, nil, nil)
}
// This reads the raw backup descriptor (with table descriptors possibly not
// upgraded from the old FK representation, or even older formats). If more
// fields are added to the output, the table descriptors may need to be
// upgraded.
desc, err := backupccl.ReadBackupManifestFromURI(ctx, basepath, externalStorageFromURI, nil)
desc, err := backupccl.ReadBackupManifestFromURI(ctx, basepath, security.RootUser,
externalStorageFromURI, nil)
if err != nil {
return err
}
......
......@@ -238,7 +238,7 @@ func (sp *csvWriter) Run(ctx context.Context) {
return errors.Wrap(err, "failed to flush csv writer")
}
conf, err := cloudimpl.ExternalStorageConfFromURI(sp.spec.Destination)
conf, err := cloudimpl.ExternalStorageConfFromURI(sp.spec.Destination, sp.spec.User)
if err != nil {
return err
}
......
......@@ -166,7 +166,7 @@ func makeInputConverter(
case roachpb.IOFileFormat_CSV:
isWorkload := true
for _, file := range spec.Uri {
if conf, err := cloudimpl.ExternalStorageConfFromURI(file); err != nil || conf.Provider != roachpb.ExternalStorageProvider_Workload {
if conf, err := cloudimpl.ExternalStorageConfFromURI(file, spec.User); err != nil || conf.Provider != roachpb.ExternalStorageProvider_Workload {
isWorkload = false
break
}
......
......@@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
......@@ -120,7 +121,8 @@ func TestConverterFlushesBatches(t *testing.T) {
group := ctxgroup.WithContext(ctx)
group.Go(func() error {
defer close(kvCh)
return conv.readFiles(ctx, testCase.inputs, nil, converterSpec.Format, externalStorageFactory)
return conv.readFiles(ctx, testCase.inputs, nil, converterSpec.Format,
externalStorageFactory, security.RootUser)
})
lastBatch := 0
......@@ -859,7 +861,7 @@ func externalStorageFactory(
return nil, err
}
return cloudimpl.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
nil, blobs.TestBlobServiceClient(workdir))
nil, blobs.TestBlobServiceClient(workdir), nil, nil)
}
// Helper to create and initialize testSpec.
......
......@@ -256,7 +256,7 @@ func importPlanHook(
} else {
for _, file := range filenamePatterns {
if cloudimpl.URINeedsGlobExpansion(file) {
s, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, file)
s, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, file, p.User())
if err != nil {
return err
}
......@@ -606,7 +606,7 @@ func importPlanHook(
seqVals := make(map[sqlbase.ID]int64)
if importStmt.Bundle {
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, files[0])
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, files[0], p.User())
if err != nil {
return err
}
......@@ -660,7 +660,8 @@ func importPlanHook(
if err != nil {
return err
}
create, err = readCreateTableFromStore(ctx, filename, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI)
create, err = readCreateTableFromStore(ctx, filename,
p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User())
if err != nil {
return err
}
......@@ -805,7 +806,7 @@ func parseAvroOptions(
"either %s or %s option must be set when importing avro record files", avroSchema, avroSchemaURI)
}
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, uri)
store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, uri, p.User())
if err != nil {
return err
}
......
......@@ -39,9 +39,12 @@ const (
)
func readCreateTableFromStore(
ctx context.Context, filename string, externalStorageFromURI cloud.ExternalStorageFromURIFactory,
ctx context.Context,
filename string,
externalStorageFromURI cloud.ExternalStorageFromURIFactory,
user string,
) (*tree.CreateTable, error) {
store, err := externalStorageFromURI(ctx, filename)
store, err := externalStorageFromURI(ctx, filename, user)
if err != nil {
return nil, err
}
......
......@@ -107,8 +107,7 @@ func Load(
database, uri string,
ts hlc.Timestamp,
loadChunkBytes int64,
tempPrefix string,
writeToDir string,
tempPrefix, writeToDir, user string,
) (backupccl.BackupManifest, error) {
if loadChunkBytes == 0 {
loadChunkBytes = *zonepb.DefaultZoneConfig().RangeMaxBytes / 2
......@@ -123,12 +122,12 @@ func Load(
semaCtx := tree.MakeSemaContext()
blobClientFactory := blobs.TestBlobServiceClient(writeToDir)
conf, err := cloudimpl.ExternalStorageConfFromURI(uri)
conf, err := cloudimpl.ExternalStorageConfFromURI(uri, user)
if err != nil {
return backupccl.BackupManifest{}, err
}
dir, err := cloudimpl.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{},
cluster.NoSettings, blobClientFactory)
cluster.NoSettings, blobClientFactory, nil, nil)
if err != nil {
return backupccl.BackupManifest{}, errors.Wrap(err, "export storage from URI")
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/importccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
......@@ -113,7 +114,8 @@ func TestImportChunking(t *testing.T) {
}
ts := hlc.Timestamp{WallTime: hlc.UnixNano()}