colexec: add Closers to NewColOperatorResult

Release justification: bug fixes and low-risk updates to new functionality.
External operators would not be Closed if not drained completely, which is not
the case with a downstream limit. The flow would clean up disk space/file
descriptors, but this commit makes it so that Close is called on these
operators in cases where the operator is not fully drained.

These are only external storage operators that need to be explicitly told to
close when not drained of input (e.g. due to a downstream limit). When creating
a component, it can now be added to result.ToClose, which will add it to the
first downstream outbox or materializer (there must be at least one in the
flow) to close either on graceful or non-graceful termination.

Release note: None (no observable impact)
parent c0855e9f
......@@ -179,6 +179,8 @@ func newTwoInputDiskSpiller(
type diskSpillerBase struct {
NonExplainable
closerHelper
inputs []Operator
spilled bool
......@@ -251,9 +253,17 @@ func (d *diskSpillerBase) reset(ctx context.Context) {
d.spilled = false
}
func (d *diskSpillerBase) Close(ctx context.Context) error {
if c, ok := d.diskBackedOp.(closer); ok {
return c.Close(ctx)
// Close closes the diskSpillerBase's input.
// TODO(asubiotto): Remove this method. It only exists so that we can call Close
// from some runTests subtests when not draining the input fully. The test
// should pass in the testing.T object used so that the caller can decide to
// explicitly close the input after checking the test.
func (d *diskSpillerBase) IdempotentClose(ctx context.Context) error {
if !d.close() {
return nil
}
if c, ok := d.diskBackedOp.(IdempotentCloser); ok {
return c.IdempotentClose(ctx)
}
return nil
}
......
......@@ -71,6 +71,7 @@ func wrapRowSources(
&execinfrapb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
......@@ -138,7 +139,10 @@ type NewColOperatorResult struct {
ColumnTypes []types.T
InternalMemUsage int
MetadataSources []execinfrapb.MetadataSource
IsStreaming bool
// ToClose is a slice of components that need to be Closed. Close should be
// idempotent.
ToClose []IdempotentCloser
IsStreaming bool
// CanRunInAutoMode returns whether the result can be run in auto mode if
// IsStreaming is false. This applies to operators that can spill to disk,
// but also operators such as the hash aggregator that buffer, but not
......@@ -426,7 +430,7 @@ func (r *NewColOperatorResult) createDiskBackedSort(
if args.TestingKnobs.NumForcedRepartitions != 0 {
maxNumberPartitions = args.TestingKnobs.NumForcedRepartitions
}
return newExternalSorter(
es := newExternalSorter(
ctx,
unlimitedAllocator,
standaloneMemAccount,
......@@ -438,6 +442,8 @@ func (r *NewColOperatorResult) createDiskBackedSort(
args.FDSemaphore,
diskAccount,
)
r.ToClose = append(r.ToClose, es.(IdempotentCloser))
return es
},
args.TestingKnobs.SpillingCallbackFn,
), nil
......@@ -820,7 +826,7 @@ func NewColOperator(
diskQueueCfg := args.DiskQueueCfg
diskQueueCfg.CacheMode = colcontainer.DiskQueueCacheModeClearAndReuseCache
diskQueueCfg.SetDefaultBufferSizeBytesForCacheMode()
return newExternalHashJoiner(
ehj := newExternalHashJoiner(
unlimitedAllocator, hjSpec,
inputOne, inputTwo,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
......@@ -844,6 +850,8 @@ func NewColOperator(
args.TestingKnobs.DelegateFDAcquisitions,
diskAccount,
)
result.ToClose = append(result.ToClose, ehj.(IdempotentCloser))
return ehj
},
args.TestingKnobs.SpillingCallbackFn,
)
......@@ -902,7 +910,7 @@ func NewColOperator(
ctx, flowCtx, monitorName,
))
diskAccount := result.createDiskAccount(ctx, flowCtx, monitorName)
result.Op, err = newMergeJoinOp(
mj, err := newMergeJoinOp(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx.Cfg),
args.DiskQueueCfg, args.FDSemaphore,
joinType, inputs[0], inputs[1], leftPhysTypes, rightPhysTypes,
......@@ -913,6 +921,8 @@ func NewColOperator(
return result, err
}
result.Op = mj
result.ToClose = append(result.ToClose, mj.(IdempotentCloser))
result.ColumnTypes = append(leftLogTypes, rightLogTypes...)
if onExpr != nil {
......@@ -1034,6 +1044,12 @@ func NewColOperator(
args.FDSemaphore, input, typs, windowFn, wf.Ordering.Columns,
int(wf.OutputColIdx+tempColOffset), partitionColIdx, peersColIdx, diskAcc,
)
// NewRelativeRankOperator sometimes returns a constOp when there
// are no ordering columns, so we check that the returned operator
// is an IdempotentCloser.
if c, ok := result.Op.(IdempotentCloser); ok {
result.ToClose = append(result.ToClose, c)
}
// Relative rank operators are buffering operators, and we
// are not comfortable running them with `auto` mode.
canRunInAutoMode = false
......
......@@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
......@@ -152,8 +153,14 @@ const (
type externalHashJoiner struct {
twoInputNode
NonExplainable
closerHelper
// mu is used to protect against concurrent IdempotentClose and Next calls,
// which are currently allowed.
// TODO(asubiotto): Explore calling IdempotentClose from the same goroutine as
// Next, which will simplify this model.
mu syncutil.Mutex
closed bool
state externalHashJoinerState
unlimitedAllocator *Allocator
spec hashJoinerSpec
......@@ -475,6 +482,8 @@ func (hj *externalHashJoiner) partitionBatch(
}
func (hj *externalHashJoiner) Next(ctx context.Context) coldata.Batch {
hj.mu.Lock()
defer hj.mu.Unlock()
StateChanged:
for {
switch hj.state {
......@@ -688,7 +697,7 @@ StateChanged:
return b
case externalHJFinished:
if err := hj.Close(ctx); err != nil {
if err := hj.idempotentCloseLocked(ctx); err != nil {
execerror.VectorizedInternalPanic(err)
}
return coldata.ZeroBatch
......@@ -698,8 +707,14 @@ StateChanged:
}
}
func (hj *externalHashJoiner) Close(ctx context.Context) error {
if hj.closed {
func (hj *externalHashJoiner) IdempotentClose(ctx context.Context) error {
hj.mu.Lock()
defer hj.mu.Unlock()
return hj.idempotentCloseLocked(ctx)
}
func (hj *externalHashJoiner) idempotentCloseLocked(ctx context.Context) error {
if !hj.close() {
return nil
}
var retErr error
......@@ -709,8 +724,8 @@ func (hj *externalHashJoiner) Close(ctx context.Context) error {
if err := hj.rightPartitioner.Close(ctx); err != nil && retErr == nil {
retErr = err
}
if c, ok := hj.diskBackedSortMerge.(closer); ok {
if err := c.Close(ctx); err != nil && retErr == nil {
if c, ok := hj.diskBackedSortMerge.(IdempotentCloser); ok {
if err := c.IdempotentClose(ctx); err != nil && retErr == nil {
retErr = err
}
}
......@@ -718,6 +733,5 @@ func (hj *externalHashJoiner) Close(ctx context.Context) error {
hj.fdState.fdSemaphore.Release(hj.fdState.acquiredFDs)
hj.fdState.acquiredFDs = 0
}
hj.closed = true
return retErr
}
......@@ -62,18 +62,6 @@ func TestExternalHashJoiner(t *testing.T) {
for _, tc := range tcs {
delegateFDAcquisitions := rng.Float64() < 0.5
t.Run(fmt.Sprintf("spillForced=%t/%s/delegateFDAcquisitions=%t", spillForced, tc.description, delegateFDAcquisitions), func(t *testing.T) {
// Unfortunately, there is currently no better way to check that the
// external hash joiner does not have leftover file descriptors other
// than appending each semaphore used to this slice on construction.
// This is because some tests don't fully drain the input, making
// intercepting the Close() method not a useful option, since it is
// impossible to check between an expected case where more than 0 FDs
// are open (e.g. in allNullsInjection, where the joiner is not fully
// drained so Close must be called explicitly) and an unexpected one.
// These cases happen during normal execution when a limit is
// satisfied, but flows will call Close explicitly on Cleanup.
// TODO(yuzefovich): not implemented yet, currently we rely on the
// flow tracking open FDs and releasing any leftovers.
var semsToCheck []semaphore.Semaphore
if !tc.onExpr.Empty() {
// When we have ON expression, there might be other operators (like
......@@ -90,10 +78,20 @@ func TestExternalHashJoiner(t *testing.T) {
sem := NewTestingSemaphore(externalHJMinPartitions)
semsToCheck = append(semsToCheck, sem)
spec := createSpecForHashJoiner(tc)
hjOp, newAccounts, newMonitors, err := createDiskBackedHashJoiner(
// TODO(asubiotto): Pass in the testing.T of the caller to this
// function and do substring matching on the test name to
// conditionally explicitly call Close() on the hash joiner
// (through result.ToClose) in cases where it is known the sorter
// will not be drained.
hjOp, newAccounts, newMonitors, closers, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, sources, func() {}, queueCfg,
2 /* numForcedPartitions */, delegateFDAcquisitions, sem,
)
// Expect three closers. These are the external hash joiner, and
// one external sorter for each input.
// TODO(asubiotto): Explicitly Close when testing.T is passed into
// this constructor and we do a substring match.
require.Equal(t, 3, len(closers))
accounts = append(accounts, newAccounts...)
monitors = append(monitors, newMonitors...)
return hjOp, err
......@@ -153,10 +151,14 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) {
var spilled bool
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(t, true /* inMem */)
defer cleanup()
hj, accounts, monitors, err := createDiskBackedHashJoiner(
sem := NewTestingSemaphore(externalHJMinPartitions)
// Ignore closers since the sorter should close itself when it is drained of
// all tuples. We assert this by checking that the semaphore reports a count
// of 0.
hj, accounts, monitors, _, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() { spilled = true }, queueCfg, 0 /* numForcedRepartitions */, true, /* delegateFDAcquisitions */
NewTestingSemaphore(externalHJMinPartitions),
sem,
)
defer func() {
for _, acc := range accounts {
......@@ -177,6 +179,7 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) {
}
require.True(t, spilled)
require.Equal(t, expectedTuplesCount, actualTuplesCount)
require.Equal(t, 0, sem.GetCount())
}
func BenchmarkExternalHashJoiner(b *testing.B) {
......@@ -255,7 +258,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
for i := 0; i < b.N; i++ {
leftSource.reset(nBatches)
rightSource.reset(nBatches)
hj, accounts, monitors, err := createDiskBackedHashJoiner(
hj, accounts, monitors, _, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() {}, queueCfg, 0 /* numForcedRepartitions */, false, /* delegateFDAcquisitions */
NewTestingSemaphore(VecMaxOpenFDsLimit),
......@@ -295,7 +298,7 @@ func createDiskBackedHashJoiner(
numForcedRepartitions int,
delegateFDAcquisitions bool,
testingSemaphore semaphore.Semaphore,
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, error) {
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, []IdempotentCloser, error) {
args := NewColOperatorArgs{
Spec: spec,
Inputs: inputs,
......@@ -310,5 +313,5 @@ func createDiskBackedHashJoiner(
args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions
args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions
result, err := NewColOperator(ctx, flowCtx, args)
return result.Op, result.OpAccounts, result.OpMonitors, err
return result.Op, result.OpAccounts, result.OpMonitors, result.ToClose, err
}
......@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
......@@ -109,8 +110,14 @@ const externalSorterMinPartitions = 3
type externalSorter struct {
OneInputNode
NonExplainable
closerHelper
// mu is used to protect against concurrent IdempotentClose and Next calls,
// which are currently allowed.
// TODO(asubiotto): Explore calling IdempotentClose from the same goroutine as
// Next, which will simplify this model.
mu syncutil.Mutex
closed bool
unlimitedAllocator *Allocator
state externalSorterState
inputTypes []coltypes.T
......@@ -239,6 +246,8 @@ func (s *externalSorter) Init() {
}
func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
s.mu.Lock()
defer s.mu.Unlock()
for {
switch s.state {
case externalSorterNewPartition:
......@@ -344,7 +353,7 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
}
return b
case externalSorterFinished:
if err := s.Close(ctx); err != nil {
if err := s.internalCloseLocked(ctx); err != nil {
execerror.VectorizedInternalPanic(err)
}
return coldata.ZeroBatch
......@@ -359,18 +368,16 @@ func (s *externalSorter) reset(ctx context.Context) {
r.reset(ctx)
}
s.state = externalSorterNewPartition
if err := s.Close(ctx); err != nil {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.internalCloseLocked(ctx); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.closed = false
s.firstPartitionIdx = 0
s.numPartitions = 0
}
func (s *externalSorter) Close(ctx context.Context) error {
if s.closed {
return nil
}
func (s *externalSorter) internalCloseLocked(ctx context.Context) error {
var lastErr error
if s.partitioner != nil {
lastErr = s.partitioner.Close(ctx)
......@@ -383,10 +390,18 @@ func (s *externalSorter) Close(ctx context.Context) error {
s.fdState.fdSemaphore.Release(s.fdState.acquiredFDs)
s.fdState.acquiredFDs = 0
}
s.closed = true
return lastErr
}
func (s *externalSorter) IdempotentClose(ctx context.Context) error {
if !s.close() {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
return s.internalCloseLocked(ctx)
}
// createMergerForPartitions creates an ordered synchronizer that will merge
// partitions in [firstIdx, firstIdx+numPartitions) range.
func (s *externalSorter) createMergerForPartitions(firstIdx, numPartitions int) Operator {
......
......@@ -68,18 +68,6 @@ func TestExternalSort(t *testing.T) {
for _, tcs := range [][]sortTestCase{sortAllTestCases, topKSortTestCases, sortChunksTestCases} {
for _, tc := range tcs {
t.Run(fmt.Sprintf("spillForced=%t/%s", spillForced, tc.description), func(t *testing.T) {
// Unfortunately, there is currently no better way to check that a
// sorter does not have leftover file descriptors other than appending
// each semaphore used to this slice on construction. This is because
// some tests don't fully drain the input, making intercepting the
// sorter.Close() method not a useful option, since it is impossible
// to check between an expected case where more than 0 FDs are open
// (e.g. in verifySelAndNullResets, where the sorter is not fully
// drained so Close must be called explicitly) and an unexpected one.
// These cases happen during normal execution when a limit is
// satisfied, but flows will call Close explicitly on Cleanup.
// TODO(asubiotto): Not implemented yet, currently we rely on the
// flow tracking open FDs and releasing any leftovers.
var semsToCheck []semaphore.Semaphore
runTests(
t,
......@@ -93,15 +81,25 @@ func TestExternalSort(t *testing.T) {
sem := NewTestingSemaphore(externalSorterMinPartitions)
// If a limit is satisfied before the sorter is drained of all its
// tuples, the sorter will not close its partitioner. During a
// flow this will happen in Cleanup, since there is no way to tell
// an operator that Next won't be called again.
// flow this will happen in a downstream materializer/outbox,
// since there is no way to tell an operator that Next won't be
// called again.
if tc.k == 0 || int(tc.k) >= len(tc.tuples) {
semsToCheck = append(semsToCheck, sem)
}
sorter, newAccounts, newMonitors, err := createDiskBackedSorter(
// TODO(asubiotto): Pass in the testing.T of the caller to this
// function and do substring matching on the test name to
// conditionally explicitly call Close() on the sorter (through
// result.ToClose) in cases where it is know the sorter will not
// be drained.
sorter, newAccounts, newMonitors, closers, err := createDiskBackedSorter(
ctx, flowCtx, input, tc.logTypes, tc.ordCols, tc.matchLen, tc.k, func() {},
externalSorterMinPartitions, false /* delegateFDAcquisition */, queueCfg, sem,
)
// Check that the sort was added as a Closer.
// TODO(asubiotto): Explicitly Close when testing.T is passed into
// this constructor and we do a substring match.
require.Equal(t, 1, len(closers))
accounts = append(accounts, newAccounts...)
monitors = append(monitors, newMonitors...)
return sorter, err
......@@ -198,10 +196,13 @@ func TestExternalSortRandomized(t *testing.T) {
func(input []Operator) (Operator, error) {
sem := NewTestingSemaphore(externalSorterMinPartitions)
semsToCheck = append(semsToCheck, sem)
sorter, newAccounts, newMonitors, err := createDiskBackedSorter(
sorter, newAccounts, newMonitors, closers, err := createDiskBackedSorter(
ctx, flowCtx, input, logTypes[:nCols], ordCols,
0 /* matchLen */, 0 /* k */, func() {},
externalSorterMinPartitions, delegateFDAcquisition, queueCfg, sem)
// TODO(asubiotto): Explicitly Close when testing.T is passed into
// this constructor and we do a substring match.
require.Equal(t, 1, len(closers))
accounts = append(accounts, newAccounts...)
monitors = append(monitors, newMonitors...)
return sorter, err
......@@ -276,7 +277,7 @@ func BenchmarkExternalSort(b *testing.B) {
// TODO(yuzefovich): do not specify maxNumberPartitions (let the
// external sorter figure out that number itself) once we pass in
// filled-in disk queue config.
sorter, accounts, monitors, err := createDiskBackedSorter(
sorter, accounts, monitors, _, err := createDiskBackedSorter(
ctx, flowCtx, []Operator{source}, logTypes, ordCols,
0 /* matchLen */, 0 /* k */, func() { spilled = true },
64 /* maxNumberPartitions */, false /* delegateFDAcquisitions */, queueCfg, &TestingSemaphore{},
......@@ -323,7 +324,7 @@ func createDiskBackedSorter(
delegateFDAcquisitions bool,
diskQueueCfg colcontainer.DiskQueueCfg,
testingSemaphore semaphore.Semaphore,
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, error) {
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, []IdempotentCloser, error) {
sorterSpec := &execinfrapb.SorterSpec{
OutputOrdering: execinfrapb.Ordering{Columns: ordCols},
OrderingMatchLen: uint32(matchLen),
......@@ -351,5 +352,5 @@ func createDiskBackedSorter(
args.TestingKnobs.NumForcedRepartitions = maxNumberPartitions
args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions
result, err := NewColOperator(ctx, flowCtx, args)
return result.Op, result.OpAccounts, result.OpMonitors, err
return result.Op, result.OpAccounts, result.OpMonitors, result.ToClose, err
}
......@@ -20,6 +20,7 @@ import (
// tuples from its input.
type limitOp struct {
OneInputNode
closerHelper
limit int
......@@ -64,17 +65,17 @@ func (c *limitOp) Next(ctx context.Context) coldata.Batch {
return bat
}
// Close is a temporary method to support the specific case in which an upstream
// operator must be Closed (e.g. an external sorter) to assert a certain state
// during tests.
// TODO(asubiotto): This method only exists because an external sorter is
// wrapped with a limit op when doing a top K sort and some tests that don't
// exhaust the sorter (e.g. allNullsInjection) need to close the operator
// explicitly. This should be removed once we have a better way of closing
// operators even when they are not exhausted.
func (c *limitOp) Close(ctx context.Context) error {
if c, ok := c.input.(closer); ok {
return c.Close(ctx)
// Close closes the limitOp's input.
// TODO(asubiotto): Remove this method. It only exists so that we can call Close
// from some runTests subtests when not draining the input fully. The test
// should pass in the testing.T object used so that the caller can decide to
// explicitly close the input after checking the test.
func (c *limitOp) IdempotentClose(ctx context.Context) error {
if !c.close() {
return nil
}
if closer, ok := c.input.(IdempotentCloser); ok {
return closer.IdempotentClose(ctx)
}
return nil
}
......@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// Materializer converts an Operator input into a execinfra.RowSource.
......@@ -53,6 +54,10 @@ type Materializer struct {
// ctxCancel in that it will cancel all components of the Materializer's flow,
// including those started asynchronously.
cancelFlow func() context.CancelFunc
// closers is a slice of IdempotentClosers that should be Closed on
// termination.
closers []IdempotentCloser
}
const materializerProcName = "materializer"
......@@ -76,12 +81,14 @@ func NewMaterializer(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
metadataSourcesQueue []execinfrapb.MetadataSource,
toClose []IdempotentCloser,
outputStatsToTrace func(),
cancelFlow func() context.CancelFunc,
) (*Materializer, error) {
m := &Materializer{
input: input,
row: make(sqlbase.EncDatumRow, len(typs)),
input: input,
row: make(sqlbase.EncDatumRow, len(typs)),
closers: toClose,
}
if err := m.ProcessorBase.Init(
......@@ -187,6 +194,13 @@ func (m *Materializer) InternalClose() bool {
if m.cancelFlow != nil {
m.cancelFlow()()
}
for _, closer := range m.closers {
if err := closer.IdempotentClose(m.Ctx); err != nil {
if log.V(1) {
log.Infof(m.Ctx, "error closing Closer: %v", err)
}
}
}
return true
}
return false
......
......@@ -57,6 +57,7 @@ func TestColumnarizeMaterialize(t *testing.T) {
&execinfrapb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
......@@ -142,6 +143,7 @@ func TestMaterializeTypes(t *testing.T) {
&execinfrapb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
......@@ -196,6 +198,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
&execinfrapb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* outputStatsToTrace */
nil, /* cancelFlow */
)
......
......@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/marusama/semaphore"
)
......@@ -296,7 +297,7 @@ func newMergeJoinBase(
leftOrdering []execinfrapb.Ordering_Column,
rightOrdering []execinfrapb.Ordering_Column,
diskAcc *mon.BoundAccount,
) (mergeJoinBase, error) {
) (*mergeJoinBase, error) {
lEqCols := make([]uint32, len(leftOrdering))
lDirections := make([]execinfrapb.Ordering_Column_Direction, len(leftOrdering))
for i, c := range leftOrdering {
......@@ -313,7 +314,7 @@ func newMergeJoinBase(
diskQueueCfg.CacheMode = colcontainer.DiskQueueCacheModeReuseCache
diskQueueCfg.SetDefaultBufferSizeBytesForCacheMode()
base := mergeJoinBase{
base := &mergeJoinBase{
twoInputNode: newTwoInputNode(left, right),
unlimitedAllocator: unlimitedAllocator,
memoryLimit: memoryLimit,
......@@ -354,6 +355,13 @@ func newMergeJoinBase(
// mergeJoinBase extracts the common logic between all merge join operators.
type mergeJoinBase struct {
twoInputNode
closerHelper
// mu is used to protect against concurrent IdempotentClose and Next calls,
// which are currently allowed.
// TODO(asubiotto): Explore calling IdempotentClose from the same goroutine as
// Next, which will simplify this model.
mu syncutil.Mutex
unlimitedAllocator *Allocator
memoryLimit int64
......@@ -395,7 +403,7 @@ type mergeJoinBase struct {
}
var _ resetter = &mergeJoinBase{}
var _ closer = &mergeJoinBase{}
var _ IdempotentCloser = &mergeJoinBase{}
func (o *mergeJoinBase) reset(ctx context.Context) {
if r, ok := o.left.source.(resetter); ok {
......@@ -694,11 +702,16 @@ func (o *mergeJoinBase) finishProbe(ctx context.Context) {
)
}
func (o *mergeJoinBase) Close(ctx context.Context) error {
func (o *mergeJoinBase) IdempotentClose(ctx context.Context) error {
o.mu.Lock()
defer o.mu.Unlock()
if !o.close() {
return nil
}
var lastErr error
for _, op := range []Operator{o.left.source, o.right.source} {
if c, ok := op.(closer); ok {
if err := c.Close(ctx); err != nil {
if c, ok := op.(IdempotentCloser); ok {
if err := c.IdempotentClose(ctx); err != nil {
lastErr = err
}
}
......
......@@ -106,7 +106,7 @@ const _MJ_OVERLOAD = 0
// */}}
type mergeJoin_JOIN_TYPE_STRINGOp struct {
mergeJoinBase
*mergeJoinBase
}
var _ InternalMemoryOperator = &mergeJoin_JOIN_TYPE_STRINGOp{}
......@@ -1437,6 +1437,8 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
// */}}
func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next(ctx context.Context) coldata.Batch {
o.mu.Lock()
defer o.mu.Unlock()
o.output.ResetInternalBatch()
for {
switch o.state {
......
......@@ -162,13 +162,33 @@ type resettableOperator interface {
resetter
}
type closer interface {
Close(ctx context.Context) error
// IdempotentCloser is an object that releases resource on the first call to
// IdempotentClose but does nothing for any subsequent call.
type IdempotentCloser interface {
IdempotentClose(ctx context.Context) error
}
// closerHelper is a simple helper that helps Operators implement
// IdempotentCloser. If close returns true, resources may be released, if it
// returns false, close has already been called.
// use.
type closerHelper struct {
closed bool
}
// close marks the closerHelper as closed. If true is returned, this is the
// first call to close.
func (c *closerHelper) close() bool {
if c.closed {
return false
}
c.closed = true
return true
}
type closableOperator interface {
Operator
closer
IdempotentCloser
}
type noopOperator struct {
......
......@@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
......@@ -204,8 +205,8 @@ func _COMPUTE_PEER_GROUPS_SIZES() { // */}}
type relativeRankInitFields struct {
rankInitFields
closerHelper
closed bool
state relativeRankState
memoryLimit int64
diskQueueCfg colcontainer.DiskQueueCfg
......@@ -240,6 +241,12 @@ const relativeRankUtilityQueueMemLimitFraction = 0.1
type _RELATIVE_RANK_STRINGOp struct {
relativeRankInitFields
// mu is used to protect against concurrent IdempotentClose and Next calls,
// which are currently allowed.
// TODO(asubiotto): Explore calling IdempotentClose from the same goroutine as
// Next, which will simplify this model.
mu syncutil.Mutex
// {{if .IsPercentRank}}
// rank indicates which rank should be assigned to the next tuple.
rank int64
......@@ -308,6 +315,8 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() {
}
func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch {
r.mu.Lock()
defer r.mu.Unlock()
var err error
for {
switch r.state {
......@@ -591,7 +600,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch {
return r.output
case relativeRankFinished:
if err := r.Close(ctx); err != nil {