Skip to content

Commit

Permalink
Pushdown vstream filter expressions to MySQL
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 31, 2025
1 parent 94f6471 commit 03c8168
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
4 changes: 2 additions & 2 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand All @@ -64,7 +64,7 @@ func main() {
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
Filter: "select * from customer where customer_id > 5 and customer_id < 10",
}},
}
conn, err := vtgateconn.Dial(ctx, "localhost:15991")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -213,6 +214,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum

filter := rule.Filter
query := filter
log.Errorf("DEBUG: building table plan for %s with filter: %s", tableName, filter)
// generate equivalent select statement if filter is empty or a keyrange.
switch {
case filter == "":
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk)
filter := uvs.plans[tableName].rule.Filter

log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK)
log.Infof("Starting copyTable for %s, Filter: %s, LastPK: %v", tableName, filter, lastPK)
uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName))

err := uvs.vse.StreamRows(ctx, filter, lastPK, func(rows *binlogdatapb.VStreamRowsResponse) error {
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
}
exprs := splitAndExpression(nil, where.Expr)
for _, expr := range exprs {
log.Errorf("DEBUG: analyzing where expression of type %T: %v", expr, sqlparser.String(expr))
switch expr := expr.(type) {
case *sqlparser.ComparisonExpr:
opcode, err := getOpcode(expr)
Expand Down Expand Up @@ -605,6 +606,12 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
// Add it to the column expressions so that it's passed down to mysqld.
if plan.whereExprsToPushDown == nil {
plan.whereExprsToPushDown = make([]sqlparser.Expr, 0)
}
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
// StrVal is varbinary, we do not support varchar since we would have to implement all collation types
if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
Expand All @@ -626,8 +633,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
ColNum: colnum,
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the column expressions so that it's passed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.FuncExpr:
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
Expand Down
24 changes: 18 additions & 6 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (rs *rowStreamer) Stream() error {
func (rs *rowStreamer) buildPlan() error {
// This pre-parsing is required to extract the table name
// and create its metadata.
log.Errorf("DEBUG: building rowstreamer plan from query: %s", rs.query)
sel, fromTable, err := analyzeSelect(rs.query, rs.se.Environment().Parser())
if err != nil {
return err
Expand Down Expand Up @@ -176,6 +177,7 @@ func (rs *rowStreamer) buildPlan() error {
log.Errorf("%s", err.Error())
return err
}
log.Errorf("DEBUG: rowstreamer plan: %+v", rs.plan)

directives := sel.Comments.Directives()
if s, found := directives.GetString("ukColumns", ""); found {
Expand All @@ -198,6 +200,7 @@ func (rs *rowStreamer) buildPlan() error {
if err != nil {
return err
}
log.Errorf("DEBUG: rowstreamer final query: %s", rs.sendQuery)
return err
}

Expand Down Expand Up @@ -260,6 +263,17 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
}
prefix = ", "
}

addPushdownExpressions := func() {
// First we add any predicates that should be pushed down.
for i, expr := range rs.plan.whereExprsToPushDown {
if i != 0 {
buf.Myprintf(" and ")
}
log.Errorf("DEBUG: adding pushdown expression to rowstreamer query: %s", sqlparser.String(expr))
buf.Myprintf("%s", sqlparser.String(expr))
}
}
// If we know the index name that we should be using then tell MySQL
// to use it if possible. This helps to ensure that we are able to
// leverage the ordering from the index itself and avoid having to
Expand All @@ -280,13 +294,8 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
st.Name, rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where ")
addPushdownExpressions()
// First we add any predicates that should be pushed down.
for i, expr := range rs.plan.whereExprsToPushDown {
if i != 0 {
buf.Myprintf(" and ")
}
buf.Myprintf("%s", sqlparser.String(expr))
}
if len(rs.plan.whereExprsToPushDown) > 0 {
buf.Myprintf(" and ")
}
Expand All @@ -308,6 +317,9 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
rs.lastpk[lastcol].EncodeSQL(buf)
buf.Myprintf(")")
}
} else if len(rs.plan.whereExprsToPushDown) > 0 {
buf.Myprintf(" where ")
addPushdownExpressions()
}
buf.Myprintf(" order by ", sqlparser.NewIdentifierCS(rs.plan.Table.Name))
prefix = ""
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (uvs *uvstreamer) buildTablePlan() error {
if rule == nil {
continue
}
log.Errorf("DEBUG: building table plan for table %s, with filter: %s", tableName, rule.Filter)
plan := &tablePlan{
tablePK: nil,
rule: &binlogdatapb.Rule{
Expand Down

0 comments on commit 03c8168

Please sign in to comment.