Commit 515406a3 authored by craig[bot]'s avatar craig[bot]

Merge #46583 #46587 #46595 #46668 #46673

46583: workload: enhance querybench r=yuzefovich a=yuzefovich

Release justification: non-production code changes.

`querybench` workload has been enhanced to support queries that consist
of multiple statements that are present on a single line. Previously,
the workload would fail on such queries because it attempts to `Prepare`
the whole line which would error out. Now we're still attempting to
prepare the whole line, but if that fails, we will store plain query and
will be executing it using `DB.Query` rather that `Stmt.Query`.

Fixes: #46547.
Fixes: #46607.

Release note: None

46587: colexec: optimize resetting of buffered groups in merge joiner r=yuzefovich a=yuzefovich

**colexec: fix resetting of buffered groups**

Release justification: bug fixes and low-risk updates to new
functionality.

Previously, whenever we needed to reset the buffered groups, we would
close the spilling queue and would create a new one when needed. This is
an overkill since we could simply reset the spilling queues that we have
which reduces amount of allocations and improves the performance.

Addresses: #46502.

Release note: None

**logictest: make results of queries in vectorize test deterministic**

Release justification: non-production code changes.

Several queries in `vectorize` logic test could have produced different
results, and this is now fixed.

Fixes: #46630.

Release note: None

46595: colexec: further optimize hash aggregator r=yuzefovich a=yuzefovich

Release justification: low-risk update to new functionality (it is
low-risk because it does not change anything fundamentally, rather only
improves the way we handle allocations and clear an internal state).

This commit optimizes the hash aggregator relationship with selection
vectors. Previously, we were maintaining a map from hash code (`uint64`)
to a slice of ints, and this would result in creating a new int slice
for every hash code that the hash aggregator ever encounters during its
run. However, we're processing atmost (about) `batchTupleLimit` tuples
at once, so at most we can have the same number of different hash codes.
This observation allows us to have constant number of int slices. To
accommodate this, we introducing a map from hash code to the "slot" in
`[][]int`, and the map is maintained to contain hash codes that we need
to process. Once the hash code has been processed, the entry is deleted.
This way both the map and the number of int slices stays constant
throughout the run of the hash aggregator.

Also this commit refactors `makeAggregateFuncs` to separate out the
creation of output types (this has some impact on performance of hash
aggregator which makes aggregate functions for every group).

Release note: None

46668: cli,build: remove backtrace support r=yuzefovich a=petermattis

Support for Backtrace Labs out of process tracers (i.e. ptrace) has been
disabled since Nov 2016. We never found this integration useful. It is
past time to remove it.

Release justification: removal of unused code

Release note: None

46673: build: do not fail RocksDB build on compiler warnings r=yuzefovich a=petermattis

Disable `-Werror` for the RocksDB build which has recently started
complaining about a missing exception specification in a jemalloc header
with the newest version of Xcode.

Release justification: low risk change to remove developer build
irritation. Should be a no-op for production builds.

Release note: None
Co-authored-by: default avatarYahor Yuzefovich <[email protected]>
Co-authored-by: default avatarPeter Mattis <[email protected]>
......@@ -293,14 +293,6 @@
pruneopts = "UT"
revision = "4b99d0c2c99ec77eb3a42344d206a88997957495"
[[projects]]
branch = "master"
digest = "1:9163b457fd33634a890c83fba9f0fd20b8991101b55490fa705b1074a7bd2cdb"
name = "github.com/backtrace-labs/go-bcd"
packages = ["."]
pruneopts = "UT"
revision = "5d8e01b2f0438922289238fd3ba043761daf1102"
[[projects]]
branch = "master"
digest = "1:927f2f9632311e72cc76586aebff836c12c0132150afc2fe251f616e41522595"
......@@ -2023,7 +2015,6 @@
"github.com/aws/aws-sdk-go/service/s3",
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/axiomhq/hyperloglog",
"github.com/backtrace-labs/go-bcd",
"github.com/benesch/cgosymbolizer",
"github.com/biogo/store/llrb",
"github.com/cenkalti/backoff",
......
......@@ -642,8 +642,7 @@ $(ROCKSDB_DIR)/Makefile: $(C_DEPS_DIR)/rocksdb-rebuild | bin/.submodules-initial
-DSNAPPY_LIBRARIES=$(LIBSNAPPY) -DSNAPPY_INCLUDE_DIR="$(SNAPPY_SRC_DIR);$(SNAPPY_DIR)" -DWITH_SNAPPY=ON \
$(if $(use-stdmalloc),,-DJEMALLOC_LIBRARIES=$(LIBJEMALLOC) -DJEMALLOC_INCLUDE_DIR=$(JEMALLOC_DIR)/include -DWITH_JEMALLOC=ON) \
-DCMAKE_BUILD_TYPE=$(if $(ENABLE_ROCKSDB_ASSERTIONS),Debug,Release) \
-DFAIL_ON_WARNINGS=$(if $(findstring windows,$(XGOOS)),0,1) \
-DUSE_RTTI=1
-DUSE_RTTI=1 -DFAIL_ON_WARNINGS=0
$(SNAPPY_DIR)/Makefile: $(C_DEPS_DIR)/snappy-rebuild | bin/.submodules-initialized
rm -rf $(SNAPPY_DIR)
......
......@@ -126,12 +126,6 @@ if test -e "${alternates_file}"; then
vols="${vols} --volume=${alternates_path}:${alternates_path}${cached_volume_mode}"
fi
backtrace_dir=${cockroach_toplevel}/../../cockroachlabs/backtrace
if test -d "${backtrace_dir}"; then
vols="${vols} --volume=${backtrace_dir}:/opt/backtrace${cached_volume_mode}"
vols="${vols} --volume=${backtrace_dir}/cockroach.cf:${container_home}/.coroner.cf${cached_volume_mode}"
fi
if [ "${BUILDER_HIDE_GOPATH_SRC:-}" != "1" ]; then
vols="${vols} --volume=${gopath0}/src:/go/src${cached_volume_mode}"
fi
......
Bump the version below when changing rocksdb CMake flags. Search for "BUILD
ARTIFACT CACHING" in build/common.mk for rationale.
13
14
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// +build linux freebsd
package cli
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/backtrace-labs/go-bcd"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"golang.org/x/sys/unix"
)
// Currently disabled as backtrace appears to be obscuring problems when test
// clusters encounter panics. See #10872.
const backtraceEnabled = false
func initBacktrace(logDir string, options ...stop.Option) *stop.Stopper {
if !backtraceEnabled {
return stop.NewStopper(options...)
}
ctx := context.TODO()
const ptracePath = "/opt/backtrace/bin/ptrace"
if _, err := os.Stat(ptracePath); err != nil {
log.Infof(ctx, "backtrace disabled: %s", err)
return stop.NewStopper(options...)
}
if err := bcd.EnableTracing(); err != nil {
log.Infof(ctx, "unable to enable backtrace: %s", err)
return stop.NewStopper(options...)
}
bcd.UpdateConfig(bcd.GlobalConfig{
PanicOnKillFailure: true,
ResendSignal: true,
RateLimit: time.Second * 3,
SynchronousPut: true,
})
// Use the default tracer implementation.
// false: Exclude system goroutines.
tracer := bcd.New(bcd.NewOptions{
IncludeSystemGs: false,
})
if err := tracer.SetOutputPath(logDir, 0755); err != nil {
log.Infof(ctx, "unable to set output path: %s", err)
// Not a fatal error, continue.
}
// Enable WARNING log output from the tracer.
tracer.AddOptions(nil, "-L", "WARNING")
info := build.GetInfo()
tracer.AddKV(nil, "cgo-compiler", info.CgoCompiler)
tracer.AddKV(nil, "go-version", info.GoVersion)
tracer.AddKV(nil, "platform", info.Platform)
tracer.AddKV(nil, "type", info.Type)
tracer.AddKV(nil, "tag", info.Tag)
tracer.AddKV(nil, "time", info.Time)
// Register for traces on signal reception.
tracer.SetSigset(
unix.SIGABRT,
unix.SIGBUS,
unix.SIGFPE,
unix.SIGILL,
unix.SIGSEGV,
)
bcd.Register(tracer)
// Hook log.Fatal*.
log.SetExitFunc(false /* hideStack */, func(code int) {
_ = bcd.Trace(tracer, fmt.Errorf("exit %d", code), nil)
os.Exit(code)
})
options = append(options,
stop.OnPanic(func(val interface{}) {
err, ok := val.(error)
if !ok {
err = fmt.Errorf("%v", val)
}
_ = bcd.Trace(tracer, err, nil)
panic(val)
}))
stopper := stop.NewStopper(options...)
// Internally, backtrace uses an external program (/opt/backtrace/bin/ptrace)
// to generate traces. We direct the stdout for this program to a file for
// debugging our usage of backtrace.
if f, err := os.OpenFile(filepath.Join(logDir, "backtrace.out"),
os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666); err != nil {
log.Infof(ctx, "unable to open: %s", err)
} else {
stopper.AddCloser(stop.CloserFn(func() {
f.Close()
}))
tracer.SetPipes(nil, f)
}
tracer.SetLogLevel(bcd.LogMax)
log.Infof(ctx, "backtrace enabled")
return stopper
}
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// +build !linux,!freebsd
package cli
import "github.com/cockroachdb/cockroach/pkg/util/stop"
func initBacktrace(logDir string, options ...stop.Option) *stop.Stopper {
return stop.NewStopper(options...)
}
......@@ -127,7 +127,6 @@ func initCLIDefaults() {
startCtx.listeningURLFile = ""
startCtx.pidFile = ""
startCtx.inBackground = false
startCtx.backtraceOutputDir = ""
quitCtx.serverDecommission = false
......@@ -325,9 +324,6 @@ var startCtx struct {
// logging settings specific to file logging.
logDir log.DirName
// directory to use for logging backtrace outputs.
backtraceOutputDir string
}
// quitCtx captures the command-line parameters of the `quit` command.
......
......@@ -293,10 +293,8 @@ func testServerArgsForTransientCluster(nodeID roachpb.NodeID, joinAddr string) b
storeSpec.StickyInMemoryEngineID = fmt.Sprintf("demo-node%d", nodeID)
args := base.TestServerArgs{
PartOfCluster: true,
Stopper: initBacktrace(
fmt.Sprintf("%s/demo-node%d", startCtx.backtraceOutputDir, nodeID),
),
PartOfCluster: true,
Stopper: stop.NewStopper(),
JoinAddr: joinAddr,
DisableTLSForHTTP: true,
StoreSpecs: []base.StoreSpec{storeSpec},
......
......@@ -1199,7 +1199,6 @@ func setupAndInitializeLoggingAndProfiling(
if p := logOutputDirectory(); p != "" {
outputDirectory = p
}
startCtx.backtraceOutputDir = outputDirectory
serverCfg.GoroutineDumpDirName = filepath.Join(outputDirectory, base.GoroutineDumpDir)
serverCfg.HeapProfileDirName = filepath.Join(outputDirectory, base.HeapProfileDir)
......@@ -1250,7 +1249,7 @@ func setupAndInitializeLoggingAndProfiling(
// Disable Stopper task tracking as performing that call site tracking is
// moderately expensive (certainly outweighing the infrequent benefit it
// provides).
stopper = initBacktrace(outputDirectory)
stopper = stop.NewStopper()
log.Event(ctx, "initialized profiles")
return stopper, nil
......
......@@ -211,8 +211,13 @@ func NewOrderedAggregator(
isScalar: isScalar,
}
a.aggregateFuncs, a.outputTypes, err = makeAggregateFuncs(a.allocator, aggTypes, aggFns)
a.aggregateFuncs, err = makeAggregateFuncs(a.allocator, aggTypes, aggFns)
if err != nil {
return nil, errors.AssertionFailedf(
"this error should have been checked in isAggregateSupported\n%+v", err,
)
}
a.outputTypes, err = makeAggregateFuncsOutputTypes(aggTypes, aggFns)
if err != nil {
return nil, errors.AssertionFailedf(
"this error should have been checked in isAggregateSupported\n%+v", err,
......@@ -224,9 +229,8 @@ func NewOrderedAggregator(
func makeAggregateFuncs(
allocator *Allocator, aggTyps [][]coltypes.T, aggFns []execinfrapb.AggregatorSpec_Func,
) ([]aggregateFunc, []coltypes.T, error) {
) ([]aggregateFunc, error) {
funcs := make([]aggregateFunc, len(aggFns))
outTyps := make([]coltypes.T, len(aggFns))
for i := range aggFns {
var err error
......@@ -250,26 +254,46 @@ func makeAggregateFuncs(
case execinfrapb.AggregatorSpec_BOOL_OR:
funcs[i] = newBoolOrAgg()
default:
return nil, nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String())
return nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String())
}
if err != nil {
return nil, err
}
}
return funcs, nil
}
func makeAggregateFuncsOutputTypes(
aggTyps [][]coltypes.T, aggFns []execinfrapb.AggregatorSpec_Func,
) ([]coltypes.T, error) {
outTyps := make([]coltypes.T, len(aggFns))
for i := range aggFns {
// Set the output type of the aggregate.
switch aggFns[i] {
case execinfrapb.AggregatorSpec_COUNT_ROWS, execinfrapb.AggregatorSpec_COUNT:
// TODO(jordan): this is a somewhat of a hack. The aggregate functions
// should come with their own output types, somehow.
outTyps[i] = coltypes.Int64
default:
case
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_AVG,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_MIN,
execinfrapb.AggregatorSpec_MAX,
execinfrapb.AggregatorSpec_BOOL_AND,
execinfrapb.AggregatorSpec_BOOL_OR:
// Output types are the input types for now.
outTyps[i] = aggTyps[i][0]
}
if err != nil {
return nil, nil, err
default:
return nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String())
}
}
return funcs, outTyps, nil
return outTyps, nil
}
func (a *orderedAggregator) initWithOutputBatchSize(outputSize int) {
......@@ -470,7 +494,7 @@ func isAggregateSupported(
return false, errors.Newf("sum_int is only supported on Int64 through vectorized")
}
}
_, outputTypes, err := makeAggregateFuncs(
_, err = makeAggregateFuncs(
nil, /* allocator */
[][]coltypes.T{aggTypes},
[]execinfrapb.AggregatorSpec_Func{aggFn},
......@@ -478,6 +502,13 @@ func isAggregateSupported(
if err != nil {
return false, err
}
outputTypes, err := makeAggregateFuncsOutputTypes(
[][]coltypes.T{aggTypes},
[]execinfrapb.AggregatorSpec_Func{aggFn},
)
if err != nil {
return false, err
}
_, retType, err := execinfrapb.GetAggregateInfo(aggFn, inputTypes...)
if err != nil {
return false, err
......
......@@ -702,6 +702,7 @@ func BenchmarkAggregator(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()
const bytesFixedLength = 8
for _, aggFn := range []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_AVG,
......@@ -716,12 +717,18 @@ func BenchmarkAggregator(b *testing.B) {
fName := execinfrapb.AggregatorSpec_Func_name[int32(aggFn)]
b.Run(fName, func(b *testing.B) {
for _, agg := range aggTypes {
for _, typ := range []coltypes.T{coltypes.Int64, coltypes.Decimal} {
for typIdx, typ := range []coltypes.T{coltypes.Int64, coltypes.Decimal, coltypes.Bytes} {
for _, groupSize := range []int{1, 2, coldata.BatchSize() / 2, coldata.BatchSize()} {
for _, hasNulls := range []bool{false, true} {
for _, numInputBatches := range []int{64} {
if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR {
typ = coltypes.Bool
if typIdx > 0 {
// We don't need to run the benchmark of bool_and and
// bool_or multiple times, so we skip all runs except
// for the first one.
continue
}
}
b.Run(fmt.Sprintf("%s/%s/groupSize=%d/hasNulls=%t/numInputBatches=%d", agg.name, typ.String(),
groupSize, hasNulls, numInputBatches),
......@@ -740,29 +747,18 @@ func BenchmarkAggregator(b *testing.B) {
}
groups[i] = int64(curGroup)
}
nullProb := 0.0
if hasNulls {
nulls := cols[1].Nulls()
for i := 0; i < nTuples; i++ {
if rng.Float64() < nullProbability {
nulls.SetNull(i)
}
}
nullProb = nullProbability
}
switch typ {
case coltypes.Int64:
coldata.RandomVec(rng, typ, bytesFixedLength, cols[1], nTuples, nullProb)
if typ == coltypes.Int64 && aggFn == execinfrapb.AggregatorSpec_SUM {
// Summation of random Int64 values can lead to
// overflow, and we will panic. To go around it, we
// restrict the range of values.
vals := cols[1].Int64()
for i := range vals {
vals[i] = rng.Int63() % 1024
}
case coltypes.Decimal:
vals := cols[1].Decimal()
for i := range vals {
vals[i].SetInt64(rng.Int63() % 1024)
}
case coltypes.Bool:
vals := cols[1].Bool()
for i := range vals {
vals[i] = rng.Float64() < 0.5
vals[i] = vals[i] % 1024
}
}
source := newChunkingBatchSource(colTypes, cols, nTuples)
......
......@@ -79,8 +79,7 @@ type hashAggregator struct {
valCols []uint32
// batchTupleLimit limits the number of tuples the aggregator will buffer
// before it starts to perform aggregation. The maximum value of this field
// is math.MaxUint16 - coldata.BatchSize().
// before it starts to perform aggregation.
batchTupleLimit int
// state stores the current state of hashAggregator.
......@@ -88,6 +87,8 @@ type hashAggregator struct {
scratch struct {
coldata.Batch
// vecs stores "unwrapped" batch.
vecs []coldata.Vec
// sels stores the intermediate selection vector for each hash code. It
// is maintained in such a way that when for a particular hashCode
......@@ -95,13 +96,20 @@ type hashAggregator struct {
// length 0. Also, onlineAgg() method will reset all modified slices to
// have zero length once it is done processing all tuples in the batch,
// this allows us to not reset the slices for all possible hash codes.
// TODO(yuzefovich): instead of having a map from hashCode to []int
// (which could result in having many int slices), we could use
// constant number of such slices (probably batchTupleLimit of them)
// and have a map from hashCode to an index in [][]int that would do
//
// Instead of having a map from hashCode to []int (which could result
// in having many int slices), we are using a constant number of such
// slices and have a map from hashCode to a "slot" in sels that does
// the "translation." The key insight here is that we will have at most
// batchTupleLimit different hashCodes at once.
sels map[uint64][]int
// batchTupleLimit (plus - possibly - constant excess) different
// hashCodes at once.
sels [][]int
// hashCodeToSelsSlot is a mapping from the hashCode to a slot in sels
// slice. New keys are added to this map when building the selections
// when new hashCode is encountered, and keys are deleted from the map
// in online aggregation phase once the tuples with the corresponding
// hash codes have been processed.
hashCodeToSelsSlot map[uint64]int
// group is a boolean vector where "true" represent the beginning of a group
// in the column. It is shared among all aggregation functions. Since
......@@ -123,6 +131,8 @@ type hashAggregator struct {
// bufferedBatch because in the worst case where all keys in the grouping
// columns are distinct, we need to store every single key in the input.
keyMapping coldata.Batch
// keyMappingVecs stores "unwrapped" keyMapping batch.
keyMappingVecs []coldata.Vec
output struct {
coldata.Batch
......@@ -155,6 +165,7 @@ type hashAggregator struct {
// hashBuffer stores hash values for each tuple in the buffered batch.
hashBuffer []uint64
alloc hashAggFuncsAlloc
cancelChecker CancelChecker
decimalScratch decimalOverloadScratch
}
......@@ -213,7 +224,7 @@ func NewHashAggregator(
}
}
_, outputTypes, err := makeAggregateFuncs(allocator, aggTyps, aggFns)
outputTypes, err := makeAggregateFuncsOutputTypes(aggTyps, aggFns)
if err != nil {
return nil, errors.AssertionFailedf(
"this error should have been checked in isAggregateSupported\n%+v", err,
......@@ -258,14 +269,19 @@ func (op *hashAggregator) Init() {
// to accommodate the case where sometimes number of buffered tuples exceeds
// op.batchTupleLimit. This is because we perform checks after appending the
// input tuples to the scratch buffer.
op.scratch.Batch =
op.allocator.NewMemBatchWithSize(op.valTypes, op.batchTupleLimit+coldata.BatchSize())
op.scratch.sels = make(map[uint64][]int)
op.scratch.group = make([]bool, op.batchTupleLimit+coldata.BatchSize())
maxBufferedTuples := op.batchTupleLimit + coldata.BatchSize()
op.scratch.Batch = op.allocator.NewMemBatchWithSize(op.valTypes, maxBufferedTuples)
op.scratch.vecs = op.scratch.ColVecs()
op.scratch.sels = make([][]int, maxBufferedTuples)
op.scratch.hashCodeToSelsSlot = make(map[uint64]int)
op.scratch.group = make([]bool, maxBufferedTuples)
// Eventually, op.keyMapping will contain as many tuples as there are
// groups in the input, but we don't know that number upfront, so we
// allocate it with some reasonably sized constant capacity.
op.keyMapping = op.allocator.NewMemBatchWithSize(op.groupTypes, op.batchTupleLimit)
op.keyMappingVecs = op.keyMapping.ColVecs()
op.hashBuffer = make([]uint64, op.batchTupleLimit+coldata.BatchSize())
op.hashBuffer = make([]uint64, maxBufferedTuples)
}
func (op *hashAggregator) Next(ctx context.Context) coldata.Batch {
......@@ -364,9 +380,9 @@ func (op *hashAggregator) bufferBatch(ctx context.Context) bool {
break
}
bufferedTupleCount += batchSize
op.allocator.PerformOperation(op.scratch.ColVecs(), func() {
op.allocator.PerformOperation(op.scratch.vecs, func() {
for i, colIdx := range op.valCols {
op.scratch.ColVec(i).Append(
op.scratch.vecs[i].Append(
coldata.SliceArgs{
ColType: op.valTypes[i],
Src: b.ColVec(int(colIdx)),
......@@ -393,7 +409,7 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) {
rehash(ctx,
hashBuffer,
op.valTypes[colIdx],
op.scratch.ColVec(int(colIdx)),
op.scratch.vecs[colIdx],
nKeys,
nil, /* sel */
op.cancelChecker,
......@@ -408,13 +424,17 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) {
// they all are of zero length here (see the comment for op.scratch.sels
// for context).
nextSelsSlot := 0
// We can use selIdx to index into op.scratch since op.scratch never has a
// a selection vector.
for selIdx, hashCode := range hashBuffer {
if _, ok := op.scratch.sels[hashCode]; !ok {
op.scratch.sels[hashCode] = make([]int, 0)
selsSlot, ok := op.scratch.hashCodeToSelsSlot[hashCode]
if !ok {
selsSlot = nextSelsSlot
op.scratch.hashCodeToSelsSlot[hashCode] = selsSlot
nextSelsSlot++
}
op.scratch.sels[hashCode] = append(op.scratch.sels[hashCode], selIdx)
op.scratch.sels[selsSlot] = append(op.scratch.sels[selsSlot], selIdx)
}
}
......@@ -422,18 +442,15 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) {
// aggFunctions for each group if it doesn't not exist. Then it calls Compute()
// on each aggregation function to perform aggregation.
func (op *hashAggregator) onlineAgg() {
// Unwrap colvecs to avoid calling .ColVec() in a tight loop each time.
keyMappingVecs := op.keyMapping.ColVecs()
scratchBufferVecs := op.scratch.ColVecs()
for _, hashCode := range op.hashBuffer {
remaining := op.scratch.sels[hashCode]
if len(remaining) == 0 {
selsSlot, ok := op.scratch.hashCodeToSelsSlot[hashCode]
if !ok {
// It is possible that multiple tuples have the same hashCode, and
// we process all such tuples when we encounter the first of these
// tuples.
continue
}
remaining := op.scratch.sels[selsSlot]
var anyMatched bool
......@@ -460,7 +477,7 @@ func (op *hashAggregator) onlineAgg() {
op.aggFuncMap[hashCode] = make([]*hashAggFuncs, 0, 1)
}
// Stage 2: Build aggregate function that doesn't exist then perform
// Stage 2: Build aggregate function that doesn't exist, then perform
// aggregation on the newly created aggregate function.
for len(remaining) > 0 {
// Record the selection vector index of the beginning of the group.
......@@ -468,16 +485,17 @@ func (op *hashAggregator) onlineAgg() {
// Build new agg functions.
keyIdx := op.keyMapping.Length()
aggFunc := &hashAggFuncs{keyIdx: keyIdx}
aggFunc := op.alloc.newHashAggFuncs()
aggFunc.keyIdx = keyIdx
// Store the key of the current aggregating group into keyMapping.
op.allocator.PerformOperation(keyMappingVecs, func() {
op.allocator.PerformOperation(op.keyMappingVecs, func() {
for keyIdx, colIdx := range op.groupCols {
// TODO(azhng): Try to preallocate enough memory so instead of
// .Append() we can use execgen.SET to improve the
// performance.
keyMappingVecs[keyIdx].Append(coldata.SliceArgs{
Src: scratchBufferVecs[colIdx],
op.keyMappingVecs[keyIdx].Append(coldata.SliceArgs{
Src: op.scratch.vecs[colIdx],
ColType: op.valTypes[colIdx],
DestIdx: aggFunc.keyIdx,
SrcStartIdx: remaining[0],
......@@ -487,8 +505,7 @@ func (op *hashAggregator) onlineAgg() {
op.keyMapping.SetLength(keyIdx + 1)
})
aggFunc.fns, _, _ =
makeAggregateFuncs(op.allocator, op.aggTypes, op.aggFuncs)
aggFunc.fns, _ = makeAggregateFuncs(op.allocator, op.aggTypes, op.aggFuncs)
op.aggFuncMap[hashCode] = append(op.aggFuncMap[hashCode], aggFunc)
// Select rest of the tuples that matches the current key. We don't need
......@@ -510,7 +527,9 @@ func (op *hashAggregator) onlineAgg() {
// We have processed all tuples with this hashCode, so we should reset
// the length of the corresponding slice.
op.scratch.sels[hashCode] = op.scratch.sels[hashCode][:0]
op.scratch.sels[selsSlot] = op.scratch.sels[selsSlot][:0]
// We also need to delete the hashCode from the mapping.
delete(op.scratch.hashCodeToSelsSlot, hashCode)
}
}
......@@ -533,8 +552,6 @@ func (op *hashAggregator) reset(ctx context.Context) {
op.keyMapping.ResetInternalBatch()
op.keyMapping.SetLength(0)
op.scratch.sels = make(map[uint64][]int)
}
// hashAggFuncs stores the aggregation functions for the corresponding
......@@ -560,3 +577,20 @@ func (v *hashAggFuncs) compute(b coldata.Batch, aggCols [][]uint32) {
fn.Compute(b, aggCols[fnIdx])
}
}
const hashAggFuncsAllocSize = 16
// hashAggFuncsAlloc is a utility struct that batches allocations of
// hashAggFuncs.
type hashAggFuncsAlloc struct {
buf []hashAggFuncs
}
func (a *hashAggFuncsAlloc) newHashAggFuncs() *hashAggFuncs {
if len(a.buf) == 0 {
a.buf = make([]hashAggFuncs, hashAggFuncsAllocSize)
}
ret := &a.buf[0]
a.buf = a.buf[1:]
return ret
}
......@@ -103,8 +103,8 @@ type mjBufferedGroup struct {
}
func (bg *mjBufferedGroup) reset(ctx context.Context) {
if err := bg.close(ctx); err != nil {
execerror.VectorizedInternalPanic(err)
if bg.spillingQueue != nil {
bg.spillingQueue.reset(ctx)
}
bg.numTuples = 0
}
......@@ -449,10 +449,18 @@ func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize int) {
o.outputBatchSize = 1<<16 - 1
}
o.proberState.lBufferedGroup.spillingQueue = newSpillingQueue(
o.unlimitedAllocator, o.left.sourceTypes, o.memoryLimit,
o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc,
)
o.proberState.lBufferedGroup.firstTuple = make([]coldata.Vec, len(o.left.sourceTypes))
for colIdx, colType := range o.left.sourceTypes {
o.proberState.lBufferedGroup.firstTuple[colIdx] = o.unlimitedAllocator.NewMemColumn(colType, 1)
}
o.proberState.rBufferedGroup.spillingQueue = newRewindableSpillingQueue(
o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit,
o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc,
)
o.proberState.rBufferedGroup.firstTuple = make([]coldata.Vec, len(o.right.sourceTypes))
for colIdx, colType := range o.right.sourceTypes {
o.proberState.rBufferedGroup.firstTuple[colIdx] = o.unlimitedAllocator.NewMemColumn(colType, 1)
......@@ -493,12 +501,6 @@ func (o *mergeJoinBase) appendToBufferedGroup(
if input == &o.left {
sourceTypes = o.left.sourceTypes
bufferedGroup = &o.proberState.lBufferedGroup
if bufferedGroup.spillingQueue == nil {
bufferedGroup.spillingQueue = newSpillingQueue(
o.unlimitedAllocator, o.left.sourceTypes, o.memoryLimit,
o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc,
)
}
// TODO(yuzefovich): uncomment when spillingQueue actually copies the
// enqueued batches when those are kept in memory.
//if o.scratch.lBufferedGroupBatch == nil {
......@@ -508,12 +510,6 @@ func (o *mergeJoinBase) appendToBufferedGroup(
} else {
sourceTypes = o.right.sourceTypes
bufferedGroup = &o.proberState.rBufferedGroup
if bufferedGroup.spillingQueue == nil {
bufferedGroup.spillingQueue = newRewindableSpillingQueue(
o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit,
o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc,
)
}
// TODO(yuzefovich): uncomment when spillingQueue actually copies the
// enqueued batches when those are kept in memory.
//if o.scratch.rBufferedGroupBatch == nil {
......@@ -716,17 +712,11 @@ func (o *mergeJoinBase) IdempotentClose(ctx context.Context) error {
}
}
}
if o.proberState.lBufferedGroup.spillingQueue != nil {
if err := o.proberState.lBufferedGroup.close(ctx); err != nil {
lastErr = err
}
o.proberState.lBufferedGroup.spillingQueue = nil
if err := o.proberState.lBufferedGroup.