From 94f6471e5977a6107a225a64c345a7bc7d265c94 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 31 Jan 2025 13:19:44 -0500 Subject: [PATCH] WiP Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/planbuilder.go | 8 ++++++++ go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index e5115afe6d3..bccc496c98f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -58,6 +58,10 @@ type Plan struct { // of the table. Filters []Filter + // WHERE clauses in the Filter query that we can push down to + // MySQL to reduce the returned rows we need to filter. + whereExprsToPushDown []sqlparser.Expr + // Convert any integer values seen in the binlog events for ENUM or SET // columns to the string values. The map is keyed on the column number, with // the value being the map of ordinal values to string values. @@ -622,6 +626,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er ColNum: colnum, Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()), }) + // Add it to the column expressions so that it's passed down to mysqld. + plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) case *sqlparser.FuncExpr: if !expr.Name.EqualString("in_keyrange") { return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) @@ -648,6 +654,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er Opcode: IsNotNull, ColNum: colnum, }) + // Add it to the column expressions so that it's passed down to mysqld. + plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) default: return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 317936e4289..1fc3017ac19 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -280,6 +280,16 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error st.Name, rs.lastpk, rs.pkColumns) } buf.WriteString(" where ") + // First we add any predicates that should be pushed down. + for i, expr := range rs.plan.whereExprsToPushDown { + if i != 0 { + buf.Myprintf(" and ") + } + buf.Myprintf("%s", sqlparser.String(expr)) + } + if len(rs.plan.whereExprsToPushDown) > 0 { + buf.Myprintf(" and ") + } prefix := "" // This loop handles the case for composite PKs. For example, // if lastpk was (1,2), the where clause would be: