diff --git a/go/vt/vtgate/planbuilder/horizon_planning.go b/go/vt/vtgate/planbuilder/horizon_planning.go index 997c67fa48b..3f5aa033bc1 100644 --- a/go/vt/vtgate/planbuilder/horizon_planning.go +++ b/go/vt/vtgate/planbuilder/horizon_planning.go @@ -154,55 +154,33 @@ func (hp *horizonPlanning) planAggregations() error { continue } - if e.Aggr && oa != nil { - fExpr, isFunc := e.Col.Expr.(*sqlparser.FuncExpr) - if !isFunc { - return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: in scatter query: complex aggregate expression") - } - opcode := engine.SupportedAggregates[fExpr.Name.Lowered()] - handleDistinct, innerAliased, err := oa.needDistinctHandlingGen4(fExpr, opcode, hp.semTable, hp.vschema) - if err != nil { - return err - } - - // Currently the OA engine primitive is able to handle only one distinct aggregation function. - // PreProcess being true tells that it is already handling it. - if oa.eaggr.PreProcess && handleDistinct { - return semantics.Gen4NotSupportedF("multiple distinct aggregation function") - } - - pushExpr := e.Col - var alias string - if handleDistinct { - pushExpr = innerAliased + fExpr, isFunc := e.Col.Expr.(*sqlparser.FuncExpr) + if !isFunc { + return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: in scatter query: complex aggregate expression") + } + opcode := engine.SupportedAggregates[fExpr.Name.Lowered()] + handleDistinct, innerAliased, err := hp.needDistinctHandling(fExpr, opcode, oa.input) + if err != nil { + return err + } - switch opcode { - case engine.AggregateCount: - opcode = engine.AggregateCountDistinct - case engine.AggregateSum: - opcode = engine.AggregateSumDistinct - } - if e.Col.As.IsEmpty() { - alias = sqlparser.String(e.Col.Expr) - } else { - alias = e.Col.As.String() - } + // Currently the OA engine primitive is able to handle only one distinct aggregation function. + // PreProcess being true tells that it is already handling it. + if oa.eaggr.PreProcess && handleDistinct { + return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "multiple distinct aggregation function") + } - oa.eaggr.PreProcess = true - hp.haveToTruncate(true) - hp.qp.GroupByExprs = append(hp.qp.GroupByExprs, abstract.GroupBy{Inner: innerAliased.Expr, WeightStrExpr: innerAliased.Expr, DistinctAggrIndex: len(oa.eaggr.Aggregates) + 1}) - } - offset, _, err := pushProjection(pushExpr, oa.input, hp.semTable, true, true) - if err != nil { - return err - } - oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, &engine.AggregateParams{ - Opcode: opcode, - Col: offset, - Alias: alias, - Expr: fExpr, - }) + pushExpr, alias, opcode := hp.createPushExprAndAlias(e, handleDistinct, innerAliased, opcode, oa) + offset, _, err := pushProjection(pushExpr, oa.input, hp.semTable, true, true) + if err != nil { + return err } + oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, &engine.AggregateParams{ + Opcode: opcode, + Col: offset, + Alias: alias, + Expr: fExpr, + }) } for _, groupExpr := range hp.qp.GroupByExprs { @@ -248,6 +226,44 @@ func (hp *horizonPlanning) planAggregations() error { return nil } +// createPushExprAndAlias creates the expression that should be pushed down to the leaves, +// and changes the opcode so it is a distinct one if needed +func (hp *horizonPlanning) createPushExprAndAlias( + expr abstract.SelectExpr, + handleDistinct bool, + innerAliased *sqlparser.AliasedExpr, + opcode engine.AggregateOpcode, + oa *orderedAggregate, +) (*sqlparser.AliasedExpr, string, engine.AggregateOpcode) { + pushExpr := expr.Col + var alias string + if handleDistinct { + pushExpr = innerAliased + + switch opcode { + case engine.AggregateCount: + opcode = engine.AggregateCountDistinct + case engine.AggregateSum: + opcode = engine.AggregateSumDistinct + } + if expr.Col.As.IsEmpty() { + alias = sqlparser.String(expr.Col.Expr) + } else { + alias = expr.Col.As.String() + } + + oa.eaggr.PreProcess = true + hp.haveToTruncate(true) + by := abstract.GroupBy{ + Inner: innerAliased.Expr, + WeightStrExpr: innerAliased.Expr, + DistinctAggrIndex: len(oa.eaggr.Aggregates) + 1, + } + hp.qp.GroupByExprs = append(hp.qp.GroupByExprs, by) + } + return pushExpr, alias, opcode +} + func hasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, groupByExprs []abstract.GroupBy) bool { for _, groupByExpr := range groupByExprs { if exprHasUniqueVindex(vschema, semTable, groupByExpr.WeightStrExpr) { @@ -606,3 +622,30 @@ func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, } return false } + +// needDistinctHandling returns true if oa needs to handle the distinct clause. +// If true, it will also return the aliased expression that needs to be pushed +// down into the underlying route. +func (hp *horizonPlanning) needDistinctHandling(funcExpr *sqlparser.FuncExpr, opcode engine.AggregateOpcode, input logicalPlan) (bool, *sqlparser.AliasedExpr, error) { + if !funcExpr.Distinct { + return false, nil, nil + } + if opcode != engine.AggregateCount && opcode != engine.AggregateSum { + return false, nil, nil + } + innerAliased, ok := funcExpr.Exprs[0].(*sqlparser.AliasedExpr) + if !ok { + return false, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "syntax error: %s", sqlparser.String(funcExpr)) + } + _, ok = input.(*route) + if !ok { + // Unreachable + return true, innerAliased, nil + } + if exprHasUniqueVindex(hp.vschema, hp.semTable, innerAliased.Expr) { + // if we can see a unique vindex on this table/column, + // we know the results will be unique, and we don't need to DISTINCTify them + return false, nil, nil + } + return true, innerAliased, nil +} diff --git a/go/vt/vtgate/planbuilder/ordered_aggregate.go b/go/vt/vtgate/planbuilder/ordered_aggregate.go index f8dcd407ac9..63b6cd33871 100644 --- a/go/vt/vtgate/planbuilder/ordered_aggregate.go +++ b/go/vt/vtgate/planbuilder/ordered_aggregate.go @@ -303,31 +303,6 @@ func (oa *orderedAggregate) needDistinctHandling(pb *primitiveBuilder, funcExpr return true, innerAliased, nil } -// needDistinctHandling returns true if oa needs to handle the distinct clause. -// If true, it will also return the aliased expression that needs to be pushed -// down into the underlying route. -func (oa *orderedAggregate) needDistinctHandlingGen4(funcExpr *sqlparser.FuncExpr, opcode engine.AggregateOpcode, semTable *semantics.SemTable, vschema ContextVSchema) (bool, *sqlparser.AliasedExpr, error) { - if !funcExpr.Distinct { - return false, nil, nil - } - if opcode != engine.AggregateCount && opcode != engine.AggregateSum { - return false, nil, nil - } - innerAliased, ok := funcExpr.Exprs[0].(*sqlparser.AliasedExpr) - if !ok { - return false, nil, fmt.Errorf("syntax error: %s", sqlparser.String(funcExpr)) - } - _, ok = oa.input.(*route) - if !ok { - // Unreachable - return true, innerAliased, nil - } - if exprHasUniqueVindex(vschema, semTable, innerAliased.Expr) { - return false, nil, nil - } - return true, innerAliased, nil -} - // Wireup implements the logicalPlan interface // If text columns are detected in the keys, then the function modifies // the primitive to pull a corresponding weight_string from mysql and