Commit 29e98580 authored by sumeerbhola's avatar sumeerbhola

sql,execinfrapb,rowexec: DistSQLPlanner and related changes for inverted join

Release note: None
parent 0b65365f
......@@ -290,6 +290,7 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
case *groupNode:
case *indexJoinNode:
case *invertedFilterNode:
case *invertedJoinNode:
case *joinNode:
case *limitNode:
case *lookupJoinNode:
......@@ -357,6 +358,15 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
case *invertedFilterNode:
return cannotDistribute, nil
case *invertedJoinNode:
if err := checkExpr(n.onExpr); err != nil {
return cannotDistribute, err
}
if _, err := checkSupportForPlanNode(n.input); err != nil {
return cannotDistribute, err
}
return shouldDistribute, nil
case *joinNode:
if err := checkExpr(n.pred.onCond); err != nil {
return cannotDistribute, err
......@@ -1908,66 +1918,151 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}
joinReaderSpec.LookupColumnsAreKey = n.eqColsAreKey
numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table)
// Set the ON condition.
if n.onCond != nil {
indexVarMap := makeIndexVarMapForLookupJoins(numInputNodeCols, n.table, plan, &post)
var err error
joinReaderSpec.OnExpr, err = physicalplan.MakeExpression(
n.onCond, planCtx, indexVarMap,
)
if err != nil {
return nil, err
}
}
if n.joinType == sqlbase.LeftSemiJoin || n.joinType == sqlbase.LeftAntiJoin {
// For anti/semi join, we only produce the input columns.
planToStreamColMap, post.OutputColumns, types = truncateToInputForLookupJoins(
numInputNodeCols, planToStreamColMap, post.OutputColumns, types)
}
// Instantiate one join reader for every stream.
plan.AddNoGroupingStage(
execinfrapb.ProcessorCoreUnion{JoinReader: &joinReaderSpec},
post,
types,
dsp.convertOrdering(planReqOrdering(n), planToStreamColMap),
)
plan.PlanToStreamColMap = planToStreamColMap
return plan, nil
}
// mappingHelperForLookupJoins creates slices etc. for the columns of
// lookup-style joins (that involve an input that is used to lookup from a
// table).
func mappingHelperForLookupJoins(
plan *PhysicalPlan, input planNode, table *scanNode,
) (
numInputNodeCols int,
planToStreamColMap []int,
post execinfrapb.PostProcessSpec,
outTypes []*types.T,
) {
// The n.table node can be configured with an arbitrary set of columns. Apply
// the corresponding projection.
// The internal schema of the join reader is:
// <input columns>... <table columns>...
numLeftCols := len(plan.ResultTypes)
numOutCols := numLeftCols + len(n.table.cols)
post := execinfrapb.PostProcessSpec{Projection: true}
numOutCols := numLeftCols + len(table.cols)
post = execinfrapb.PostProcessSpec{Projection: true}
post.OutputColumns = make([]uint32, numOutCols)
types := make([]*types.T, numOutCols)
outTypes = make([]*types.T, numOutCols)
for i := 0; i < numLeftCols; i++ {
types[i] = plan.ResultTypes[i]
outTypes[i] = plan.ResultTypes[i]
post.OutputColumns[i] = uint32(i)
}
for i := range n.table.cols {
types[numLeftCols+i] = n.table.cols[i].Type
ord := tableOrdinal(n.table.desc, n.table.cols[i].ID, n.table.colCfg.visibility)
for i := range table.cols {
outTypes[numLeftCols+i] = table.cols[i].Type
ord := tableOrdinal(table.desc, table.cols[i].ID, table.colCfg.visibility)
post.OutputColumns[numLeftCols+i] = uint32(numLeftCols + ord)
}
// Map the columns of the lookupJoinNode to the result streams of the
// JoinReader.
numInputNodeCols := len(planColumns(n.input))
planToStreamColMap := makePlanToStreamColMap(numInputNodeCols + len(n.table.cols))
numInputNodeCols = len(planColumns(input))
planToStreamColMap = makePlanToStreamColMap(numInputNodeCols + len(table.cols))
copy(planToStreamColMap, plan.PlanToStreamColMap)
for i := range n.table.cols {
for i := range table.cols {
planToStreamColMap[numInputNodeCols+i] = numLeftCols + i
}
return numInputNodeCols, planToStreamColMap, post, outTypes
}
func makeIndexVarMapForLookupJoins(
numInputNodeCols int, table *scanNode, plan *PhysicalPlan, post *execinfrapb.PostProcessSpec,
) (indexVarMap []int) {
// Note that (regardless of the join type or the OutputColumns projection)
// the inverted expression and ON condition refers to the input columns with
// var indexes 0 to numInputNodeCols-1 and to table columns with var indexes
// starting from numInputNodeCols.
indexVarMap = makePlanToStreamColMap(numInputNodeCols + len(table.cols))
copy(indexVarMap, plan.PlanToStreamColMap)
numLeftCols := len(plan.ResultTypes)
for i := range table.cols {
indexVarMap[numInputNodeCols+i] = int(post.OutputColumns[numLeftCols+i])
}
return indexVarMap
}
func truncateToInputForLookupJoins(
numInputNodeCols int, planToStreamColMap []int, outputColumns []uint32, outTypes []*types.T,
) ([]int, []uint32, []*types.T) {
planToStreamColMap = planToStreamColMap[:numInputNodeCols]
outputColumns = outputColumns[:numInputNodeCols]
outTypes = outTypes[:numInputNodeCols]
return planToStreamColMap, outputColumns, outTypes
}
func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
planCtx *PlanningCtx, n *invertedJoinNode,
) (*PhysicalPlan, error) {
plan, err := dsp.createPhysPlanForPlanNode(planCtx, n.input)
if err != nil {
return nil, err
}
invertedJoinerSpec := execinfrapb.InvertedJoinerSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
}
invertedJoinerSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
return nil, err
}
invertedJoinerSpec.LookupColumn = uint32(plan.PlanToStreamColMap[n.inputCol])
numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table)
indexVarMap := makeIndexVarMapForLookupJoins(numInputNodeCols, n.table, plan, &post)
if invertedJoinerSpec.InvertedExpr, err = physicalplan.MakeExpression(
n.invertedExpr, planCtx, indexVarMap,
); err != nil {
return nil, err
}
// Set the ON condition.
if n.onCond != nil {
// Note that (regardless of the join type or the OutputColumns projection)
// the ON condition refers to the input columns with var indexes 0 to
// numInputNodeCols-1 and to table columns with var indexes starting from
// numInputNodeCols.
indexVarMap := makePlanToStreamColMap(numInputNodeCols + len(n.table.cols))
copy(indexVarMap, plan.PlanToStreamColMap)
for i := range n.table.cols {
indexVarMap[numInputNodeCols+i] = int(post.OutputColumns[numLeftCols+i])
}
var err error
joinReaderSpec.OnExpr, err = physicalplan.MakeExpression(
n.onCond, planCtx, indexVarMap,
)
if err != nil {
if n.onExpr != nil {
if invertedJoinerSpec.OnExpr, err = physicalplan.MakeExpression(
n.onExpr, planCtx, indexVarMap,
); err != nil {
return nil, err
}
}
if n.joinType == sqlbase.LeftSemiJoin || n.joinType == sqlbase.LeftAntiJoin {
// For anti/semi join, we only produce the input columns.
planToStreamColMap = planToStreamColMap[:numInputNodeCols]
post.OutputColumns = post.OutputColumns[:numInputNodeCols]
types = types[:numInputNodeCols]
planToStreamColMap, post.OutputColumns, types = truncateToInputForLookupJoins(
numInputNodeCols, planToStreamColMap, post.OutputColumns, types)
}
// Instantiate one join reader for every stream.
// Instantiate one inverted joiner for every stream.
plan.AddNoGroupingStage(
execinfrapb.ProcessorCoreUnion{JoinReader: &joinReaderSpec},
execinfrapb.ProcessorCoreUnion{InvertedJoiner: &invertedJoinerSpec},
post,
types,
dsp.convertOrdering(planReqOrdering(n), planToStreamColMap),
......@@ -2324,6 +2419,9 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
case *invertedFilterNode:
plan, err = dsp.createPlanForInvertedFilter(planCtx, n)
case *invertedJoinNode:
plan, err = dsp.createPlanForInvertedJoin(planCtx, n)
case *joinNode:
plan, err = dsp.createPlanForJoin(planCtx, n)
......
......@@ -161,6 +161,7 @@ func (jr *JoinReaderSpec) summary() (string, []string) {
if jr.Type != sqlbase.InnerJoin {
details = append(details, joinTypeDetail(jr.Type))
}
// TODO(yuzefovich): swap to make it [email protected]
details = append(details, fmt.Sprintf("%[email protected]%s", index, jr.Table.Name))
if jr.LookupColumns != nil {
details = append(details, fmt.Sprintf("Lookup join on: %s", colListStr(jr.LookupColumns)))
......@@ -295,6 +296,23 @@ func (zj *ZigzagJoinerSpec) summary() (string, []string) {
return name, details
}
// summary implements the diagramCellType interface.
func (ij *InvertedJoinerSpec) summary() (string, []string) {
index := ij.Table.Indexes[ij.IndexIdx-1].Name
details := make([]string, 0, 4)
if ij.Type != sqlbase.InnerJoin {
details = append(details, joinTypeDetail(ij.Type))
}
// TODO(yuzefovich): swap to make it [email protected]
details = append(details, fmt.Sprintf("%[email protected]%s", index, ij.Table.Name))
details = append(details, fmt.Sprintf("Inverted join on: @%d", ij.LookupColumn+1))
details = append(details, fmt.Sprintf("InvertedExpr %s", ij.InvertedExpr))
if !ij.OnExpr.Empty() {
details = append(details, fmt.Sprintf("ON %s", ij.OnExpr))
}
return "InvertedJoiner", details
}
// summary implements the diagramCellType interface.
func (s *SorterSpec) summary() (string, []string) {
details := []string{s.OutputOrdering.diagramString()}
......
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)
type invertedJoinNode struct {
input planNode
table *scanNode
// joinType is one of INNER, LEFT_OUTER, LEFT_SEMI, LEFT_ANTI.
joinType sqlbase.JoinType
// The inverted expression to evaluate.
invertedExpr tree.TypedExpr
// inputCol identifies the column from the input to be used for the
// inverted join.
inputCol int32
// columns are the produced columns, namely the input columns and (unless the
// join type is semi or anti join) the columns in the table scanNode.
columns sqlbase.ResultColumns
// onExpr is any ON condition to be used in conjunction with the inverted
// expression.
onExpr tree.TypedExpr
}
// CanParallelize indicates whether the fetchers can parallelize the
// batches of lookups that can be performed. This should be kept in
// sync with the behavior of invertedJoiner scan behavior.
func (ij *invertedJoinNode) CanParallelize() bool {
return true
}
func (ij *invertedJoinNode) startExec(params runParams) error {
panic("invertedJoinNode cannot be run in local mode")
}
func (ij *invertedJoinNode) Next(params runParams) (bool, error) {
panic("invertedJoinNode cannot be run in local mode")
}
func (ij *invertedJoinNode) Values() tree.Datums {
panic("invertedJoinNode cannot be run in local mode")
}
func (ij *invertedJoinNode) Close(ctx context.Context) {
ij.input.Close(ctx)
ij.table.Close(ctx)
}
statement ok
CREATE TABLE ltable(
lk int primary key,
geom geometry
)
statement ok
INSERT INTO ltable VALUES
(1, 'POINT(3.0 3.0)'),
(2, 'POINT(4.5 4.5)'),
(3, 'POINT(1.5 1.5)')
statement ok
CREATE TABLE rtable(
rk int primary key,
geom geometry,
INVERTED INDEX geom_index(geom)
)
statement ok
INSERT INTO rtable VALUES
(11, 'POINT(1.0 1.0)'),
(12, 'LINESTRING(1.0 1.0, 2.0 2.0)'),
(13, 'POINT(3.0 3.0)'),
(14, 'LINESTRING(4.0 4.0, 5.0 5.0)'),
(15, 'LINESTRING(40.0 40.0, 41.0 41.0)'),
(16, 'POLYGON((1.0 1.0, 5.0 1.0, 5.0 5.0, 1.0 5.0, 1.0 1.0))')
query II
SELECT lk, rk FROM ltable JOIN [email protected]_index ON ST_Intersects(ltable.geom, rtable.geom) ORDER BY (lk, rk)
----
1 13
1 16
2 14
2 16
3 12
3 16
# LogicTest: 5node-default-configs !5node-metadata
statement ok
CREATE TABLE ltable(
lk int primary key,
geom geometry
)
statement ok
INSERT INTO ltable VALUES
(1, 'POINT(3.0 3.0)'),
(2, 'POINT(4.5 4.5)'),
(3, 'POINT(1.5 1.5)')
statement ok
CREATE TABLE rtable(
rk int primary key,
geom geometry,
INVERTED INDEX geom_index(geom)
)
statement ok
INSERT INTO rtable VALUES
(11, 'POINT(1.0 1.0)'),
(12, 'LINESTRING(1.0 1.0, 2.0 2.0)'),
(13, 'POINT(3.0 3.0)'),
(14, 'LINESTRING(4.0 4.0, 5.0 5.0)'),
(15, 'LINESTRING(40.0 40.0, 41.0 41.0)'),
(16, 'POLYGON((1.0 1.0, 5.0 1.0, 5.0 5.0, 1.0 5.0, 1.0 1.0))')
statement ok
ALTER TABLE ltable SPLIT AT VALUES (2), (3)
statement ok
ALTER TABLE ltable EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 1), (ARRAY[2], 2), (ARRAY[3], 3)
query TTTI colnames
SELECT start_key, end_key, replicas, lease_holder from [SHOW EXPERIMENTAL_RANGES FROM TABLE ltable] ORDER BY lease_holder
----
start_key end_key replicas lease_holder
NULL /2 {1} 1
/2 /3 {2} 2
/3 NULL {3} 3
query II
SELECT lk, rk FROM ltable JOIN [email protected]_index ON ST_Intersects(ltable.geom, rtable.geom) ORDER BY (lk, rk)
----
1 13
1 16
2 14
2 16
3 12
3 16
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT lk, rk FROM ltable JOIN [email protected]_index
ON ST_Intersects(ltable.geom, rtable.geom) ORDER BY (lk, rk)]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlF9v2jAUxd_3Kaz7VFSzYAf6J0_ZVialYqSDPmyqUJWRqypriDPbmZgQ331KQgdhxAlC2vqG7Xt87j354RWoHzE4MB2Ohh_uSSZj8nHifyIPwy93o3femJzdeNP76edRh2xK4mdK5HNZFevgW4zk1vfGRBa_3ScUi8coCXFJ_DFR-jFKNEqFc63OyvK3eQnd1BeLDvEnN8MJef-VnJXXd2ZAIREhjoMFKnAegAEFDhRsmFFIpZijUkLmR6ui0AuX4PQoREma6Xx7RmEuJIKzAh3pGMGB-9xxgkGI0uoBhRB1EMXF9amMFoH85ZYtAoVpGiTKIV2Lw2xNQWR6e6_SwROCw9a0vbeX_ESpMbwVUYLSsqv229Rc-dLBi4J8F1FCROIQl-9sD5ep3MvX5ZS4_Q5Q8DPtEJdRl1PXrh2AHzPAVEiN0rqoNu6yc-ra57UW9jEWeTabz3N9-PP8CWckxHOW7kSTB_oXcEUgg0og9Wn0a1vddihkiBLDAxnwPIMDM41FV6QW4xVJXQuDSgusPc2sHc0W71r1AbBj3Pd47r8-nhsG2PB8eQrPDRY7PLOa9-bfAc3b08Rb0mR3W7LU4L3H0uD1sdQwwIalq1NYarDYZanm3_5_HscDvU5QpSJR2OrN6-WvJoZPWL60SmRyjndSzAubcukXumIjRKXLU1YuvKQ8yhvcFTOjmJvF3Ci2zWLbKO5XxGxf3DfP3GA9MKovzOILo_jSLL40iq_M4qtTErs2J9ZrwKQBsibKzJixBs6YGbT9yWfrN78DAAD__5Ui0HY=
# LogicTest: local
# EXPLAIN test cases for using invertedJoiner on an inverted geospatial index.
statement ok
CREATE TABLE ltable(
lk int primary key,
geom geometry
)
statement ok
CREATE TABLE rtable(
rk1 int,
geom geometry,
rk2 string,
PRIMARY KEY (rk1, rk2),
INVERTED INDEX geom_index(geom)
)
query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT lk, rk1 FROM ltable JOIN [email protected]_index ON ST_Intersects(ltable.geom, rtable.geom)]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUkkFv00AQhe_8itGcGmmgsV047GkRBMlViEuSA1JlVcYeRSbOrpldo6DI_x3ZTmkSkai5eWbfm_n2eXfoflWocDGZTj4toZEKvsyTr_A4-f4w_RjP4OZzvFguvk1HsJdUawJZB4Os8tmPiuE-iWcg_bdesd08labgLSQzcP6pNJ7Fce7dzSB_10lor--LUYqExhY8yzbsUD1igClhLTZn56x0rV0viIstqjFhaerGd-2UMLfCqHboS18xKlx2c-ecFSy3YyQs2Gdl1Y-tpdxk8kcPIEi4qDPjFLzFtCW0jX-Z6ny2YlRBS6_fHJvfLJ6Le1saltvgePlLMlqe9z874KctDVijQIcH7cm2lpMMdUig70ZImDRegQ5Ih6Qj0u_P3iG85g4d-z688P_h_YOfWrtu6gP0iPRdB3b633vmD0fM0Vna6BraObvaGsdHpOcmj9uUkIsVD-_J2UZyfhCb92uGMul9faNg54fTYChiMxx1gIfm4KI5vGwOL5qjE3PavvkbAAD__zf5Nns=
query TTT
EXPLAIN SELECT lk, rk1, rk2, rtable.geom
FROM ltable JOIN [email protected]_index ON ST_Intersects(ltable.geom, rtable.geom)
----
· distribution local
· vectorized true
render · ·
└── lookup-join · ·
│ table [email protected]
│ type inner
│ equality (rk1, rk2) = (rk1, rk2)
│ equality cols are key ·
│ parallel ·
│ pred st_intersects(geom, geom)
└── render · ·
└── inverted-join · ·
│ table [email protected]_index
│ type inner
│ · st_intersects(@2, @4)
│ parallel ·
└── scan · ·
· table [email protected]
· spans FULL SCAN
......@@ -76,6 +76,6 @@ CREATE TABLE rtable(
INVERTED INDEX geom_index(geom)
)
query error Geospatial joins are not yet supported
statement ok
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT ltable.k, rtable.k FROM ltable JOIN [email protected]_index ON ST_Intersects(ltable.geom, rtable.geom)]
......@@ -620,3 +620,35 @@ render · ·
└── scan · · (k, geom) ·
· table [email protected]_index · ·
· spans /1152921504606846976-/1152921504606846977 /1152921573326323712-/1152921573326323713 /1152921574400065536-/1152921574400065537 /1152921574668500992-/1152921574668500993 /1152921574735609856-/1152921574735609857 /1152921574739804160-/1152921574739804161 /1152921574740066304-/1152921574740066305 /1152921574740070400-/1152921574740070401 /1152921574740070464-/1152921574740070465 /1152921574740070468-/1152921574740070469 /1152921574740070469-/1152921574740070470 /1152921574740070480-/1152921574740070481 /1152921574740070656-/1152921574740070657 /1152921574740071424-/1152921574740071425 /1152921574740082688-/1152921574740082689 /1152921574740131840-/1152921574740131841 /1152921574740852736-/1152921574740852737 /1152921574752387072-/1152921574752387073 /1152921577621291008-/1152921577621291009 /1152921590506192896-/1152921590506192897 /1152921779484753920-/1152921779484753921 /1152922604118474752-/1152922604118474753 /1152925902653358080-/1152925902653358081 /1152939096792891392-/1152939096792891393 /1152991873351024640-/1152991873351024641 /1153202979583557632-/1153202979583557633 /1154047404513689600-/1154047404513689601 /1157425104234217472-/1157425104234217473 /1170935903116328960-/1170935903116328961 /1224979098644774912-/1224979098644774913 /1441151880758558720-/1441151880758558721 · ·
statement ok
CREATE TABLE geo_table2(
k int primary key,
geom geometry
)
query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM geo_table2 JOIN [email protected]_index
ON ST_Intersects(geo_table2.geom, geo_table.geom)
----
· distribution local · ·
· vectorized true · ·
lookup-join · · (k, geom, k, geom) ·
│ table [email protected] · ·
│ type inner · ·
│ equality (k) = (k) · ·
│ equality cols are key · · ·
│ parallel · · ·
│ pred st_intersects(geom, geom) · ·
└── render · · (k, geom, k) ·
│ render 0 k · ·
│ render 1 geom · ·
│ render 2 k · ·
└── inverted-join · · (k, geom, k, geom) ·
│ table [email protected]_index · ·
│ type inner · ·
│ · st_intersects(@2, @4) · ·
│ parallel · · ·
└── scan · · (k, geom) ·
· table [email protected] · ·
· spans FULL SCAN · ·
......@@ -760,10 +760,45 @@ func (ef *execFactory) ConstructInvertedJoin(
inputCol exec.NodeColumnOrdinal,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
reqOrdering exec.OutputOrdering,
// The OutputOrdering is ignored since the inverted join always maintains
// the ordering of the input rows.
_ exec.OutputOrdering,
) (exec.Node, error) {
// TODO(rytaft, sumeerbhola): Fill this in.
return nil, errors.Errorf("Geospatial joins are not yet supported")
tabDesc := table.(*optTable).desc
indexDesc := index.(*optIndex).desc
// NB: lookupCols does not include the inverted column, which is only a partial
// representation of the original table column. This scan configuration does not
// affect what the invertedJoiner implementation retrieves from the inverted
// index (which includes the inverted column). This scan configuration is used
// later for computing the output from the inverted join.
colCfg := makeScanColumnsConfig(table, lookupCols)
tableScan := ef.planner.Scan()
if err := tableScan.initTable(context.TODO(), ef.planner, tabDesc, nil, colCfg); err != nil {
return nil, err
}
tableScan.index = indexDesc
n := &invertedJoinNode{
input: input.(planNode),
table: tableScan,
joinType: joinType,
invertedExpr: invertedExpr,
inputCol: int32(inputCol),
}
if onCond != nil && onCond != tree.DBoolTrue {
n.onExpr = onCond
}
// Build the result columns.
inputCols := planColumns(input.(planNode))
var scanCols sqlbase.ResultColumns
if joinType != sqlbase.LeftSemiJoin && joinType != sqlbase.LeftAntiJoin {
scanCols = planColumns(tableScan)
}
n.columns = make(sqlbase.ResultColumns, 0, len(inputCols)+len(scanCols))
n.columns = append(n.columns, inputCols...)
n.columns = append(n.columns, scanCols...)
return n, nil
}
// Helper function to create a scanNode from just a table / index descriptor
......
......@@ -93,6 +93,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {
return n.columns
case *invertedFilterNode:
return n.resultColumns
case *invertedJoinNode:
return n.columns
// Nodes with a fixed schema.
case *scrubNode:
......
......@@ -147,6 +147,9 @@ var _ execinfra.OpNode = &invertedJoiner{}
const invertedJoinerProcName = "inverted joiner"
// newInvertedJoiner constructs an invertedJoiner. The datumToInvertedExpr
// argument is non-nil only for tests. When nil, the invertedJoiner uses
// the spec to construct an implementation of DatumToInvertedExpr.
func newInvertedJoiner(
flowCtx *execinfra.FlowCtx,
processorID int32,
......
......@@ -204,6 +204,13 @@ func NewProcessor(
outputs[0], false, /* disableTempStorage */
)
}
if core.InvertedJoiner != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newInvertedJoiner(
flowCtx, processorID, core.InvertedJoiner, nil, inputs[0], post, outputs[0])
}
if core.Backfiller != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
return nil, err
......
......@@ -385,6 +385,22 @@ func (v *planVisitor) visitInternal(plan planNode, name string) {
}
n.input = v.visit(n.input)
case *invertedJoinNode:
if v.observer.attr != nil {
v.observer.attr(name, "table", fmt.Sprintf("%[email protected]%s", n.table.desc.Name, n.table.index.Name))
v.observer.attr(name, "type", joinTypeStr(n.joinType))
}
if v.observer.expr != nil {
v.expr(name, "", -1, n.invertedExpr)
if n.onExpr != nil && n.onExpr != tree.DBoolTrue {
v.expr(name, "onExpr", -1, n.onExpr)
}
}
if n.CanParallelize() {
v.observer.attr(name, "parallel", "")
}
n.input = v.visit(n.input)
case *limitNode:
if v.observer.expr != nil {
v.expr(name, "count", -1, n.countExpr)
......@@ -936,6 +952,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&insertNode{}): "insert",
reflect.TypeOf(&insertFastPathNode{}): "insert-fast-path",
reflect.TypeOf(&invertedFilterNode{}): "inverted-filter",
reflect.TypeOf(&invertedJoinNode{}): "inverted-join",
reflect.TypeOf(&joinNode{}): "join",
reflect.TypeOf(&limitNode{}): "limit",
reflect.TypeOf(&lookupJoinNode{}): "lookup-join",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment