Skip to content

Commit

Permalink
small refactoring to move code closer together
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Jul 28, 2021
1 parent c4ba17d commit 3407e07
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 71 deletions.
135 changes: 89 additions & 46 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,55 +154,33 @@ func (hp *horizonPlanning) planAggregations() error {
continue
}

if e.Aggr && oa != nil {
fExpr, isFunc := e.Col.Expr.(*sqlparser.FuncExpr)
if !isFunc {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: in scatter query: complex aggregate expression")
}
opcode := engine.SupportedAggregates[fExpr.Name.Lowered()]
handleDistinct, innerAliased, err := oa.needDistinctHandlingGen4(fExpr, opcode, hp.semTable, hp.vschema)
if err != nil {
return err
}

// Currently the OA engine primitive is able to handle only one distinct aggregation function.
// PreProcess being true tells that it is already handling it.
if oa.eaggr.PreProcess && handleDistinct {
return semantics.Gen4NotSupportedF("multiple distinct aggregation function")
}

pushExpr := e.Col
var alias string
if handleDistinct {
pushExpr = innerAliased
fExpr, isFunc := e.Col.Expr.(*sqlparser.FuncExpr)
if !isFunc {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: in scatter query: complex aggregate expression")
}
opcode := engine.SupportedAggregates[fExpr.Name.Lowered()]
handleDistinct, innerAliased, err := hp.needDistinctHandling(fExpr, opcode, oa.input)
if err != nil {
return err
}

switch opcode {
case engine.AggregateCount:
opcode = engine.AggregateCountDistinct
case engine.AggregateSum:
opcode = engine.AggregateSumDistinct
}
if e.Col.As.IsEmpty() {
alias = sqlparser.String(e.Col.Expr)
} else {
alias = e.Col.As.String()
}
// Currently the OA engine primitive is able to handle only one distinct aggregation function.
// PreProcess being true tells that it is already handling it.
if oa.eaggr.PreProcess && handleDistinct {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "multiple distinct aggregation function")
}

oa.eaggr.PreProcess = true
hp.haveToTruncate(true)
hp.qp.GroupByExprs = append(hp.qp.GroupByExprs, abstract.GroupBy{Inner: innerAliased.Expr, WeightStrExpr: innerAliased.Expr, DistinctAggrIndex: len(oa.eaggr.Aggregates) + 1})
}
offset, _, err := pushProjection(pushExpr, oa.input, hp.semTable, true, true)
if err != nil {
return err
}
oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, &engine.AggregateParams{
Opcode: opcode,
Col: offset,
Alias: alias,
Expr: fExpr,
})
pushExpr, alias, opcode := hp.createPushExprAndAlias(e, handleDistinct, innerAliased, opcode, oa)
offset, _, err := pushProjection(pushExpr, oa.input, hp.semTable, true, true)
if err != nil {
return err
}
oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, &engine.AggregateParams{
Opcode: opcode,
Col: offset,
Alias: alias,
Expr: fExpr,
})
}

for _, groupExpr := range hp.qp.GroupByExprs {
Expand Down Expand Up @@ -248,6 +226,44 @@ func (hp *horizonPlanning) planAggregations() error {
return nil
}

// createPushExprAndAlias creates the expression that should be pushed down to the leaves,
// and changes the opcode so it is a distinct one if needed
func (hp *horizonPlanning) createPushExprAndAlias(
expr abstract.SelectExpr,
handleDistinct bool,
innerAliased *sqlparser.AliasedExpr,
opcode engine.AggregateOpcode,
oa *orderedAggregate,
) (*sqlparser.AliasedExpr, string, engine.AggregateOpcode) {
pushExpr := expr.Col
var alias string
if handleDistinct {
pushExpr = innerAliased

switch opcode {
case engine.AggregateCount:
opcode = engine.AggregateCountDistinct
case engine.AggregateSum:
opcode = engine.AggregateSumDistinct
}
if expr.Col.As.IsEmpty() {
alias = sqlparser.String(expr.Col.Expr)
} else {
alias = expr.Col.As.String()
}

oa.eaggr.PreProcess = true
hp.haveToTruncate(true)
by := abstract.GroupBy{
Inner: innerAliased.Expr,
WeightStrExpr: innerAliased.Expr,
DistinctAggrIndex: len(oa.eaggr.Aggregates) + 1,
}
hp.qp.GroupByExprs = append(hp.qp.GroupByExprs, by)
}
return pushExpr, alias, opcode
}

func hasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, groupByExprs []abstract.GroupBy) bool {
for _, groupByExpr := range groupByExprs {
if exprHasUniqueVindex(vschema, semTable, groupByExpr.WeightStrExpr) {
Expand Down Expand Up @@ -606,3 +622,30 @@ func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable,
}
return false
}

// needDistinctHandling returns true if oa needs to handle the distinct clause.
// If true, it will also return the aliased expression that needs to be pushed
// down into the underlying route.
func (hp *horizonPlanning) needDistinctHandling(funcExpr *sqlparser.FuncExpr, opcode engine.AggregateOpcode, input logicalPlan) (bool, *sqlparser.AliasedExpr, error) {
if !funcExpr.Distinct {
return false, nil, nil
}
if opcode != engine.AggregateCount && opcode != engine.AggregateSum {
return false, nil, nil
}
innerAliased, ok := funcExpr.Exprs[0].(*sqlparser.AliasedExpr)
if !ok {
return false, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "syntax error: %s", sqlparser.String(funcExpr))
}
_, ok = input.(*route)
if !ok {
// Unreachable
return true, innerAliased, nil
}
if exprHasUniqueVindex(hp.vschema, hp.semTable, innerAliased.Expr) {
// if we can see a unique vindex on this table/column,
// we know the results will be unique, and we don't need to DISTINCTify them
return false, nil, nil
}
return true, innerAliased, nil
}
25 changes: 0 additions & 25 deletions go/vt/vtgate/planbuilder/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,31 +303,6 @@ func (oa *orderedAggregate) needDistinctHandling(pb *primitiveBuilder, funcExpr
return true, innerAliased, nil
}

// needDistinctHandling returns true if oa needs to handle the distinct clause.
// If true, it will also return the aliased expression that needs to be pushed
// down into the underlying route.
func (oa *orderedAggregate) needDistinctHandlingGen4(funcExpr *sqlparser.FuncExpr, opcode engine.AggregateOpcode, semTable *semantics.SemTable, vschema ContextVSchema) (bool, *sqlparser.AliasedExpr, error) {
if !funcExpr.Distinct {
return false, nil, nil
}
if opcode != engine.AggregateCount && opcode != engine.AggregateSum {
return false, nil, nil
}
innerAliased, ok := funcExpr.Exprs[0].(*sqlparser.AliasedExpr)
if !ok {
return false, nil, fmt.Errorf("syntax error: %s", sqlparser.String(funcExpr))
}
_, ok = oa.input.(*route)
if !ok {
// Unreachable
return true, innerAliased, nil
}
if exprHasUniqueVindex(vschema, semTable, innerAliased.Expr) {
return false, nil, nil
}
return true, innerAliased, nil
}

// Wireup implements the logicalPlan interface
// If text columns are detected in the keys, then the function modifies
// the primitive to pull a corresponding weight_string from mysql and
Expand Down

0 comments on commit 3407e07

Please sign in to comment.