Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 31, 2025
1 parent cb7d61a commit 94f6471
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Plan struct {
// of the table.
Filters []Filter

// WHERE clauses in the Filter query that we can push down to
// MySQL to reduce the returned rows we need to filter.
whereExprsToPushDown []sqlparser.Expr

// Convert any integer values seen in the binlog events for ENUM or SET
// columns to the string values. The map is keyed on the column number, with
// the value being the map of ordinal values to string values.
Expand Down Expand Up @@ -622,6 +626,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
ColNum: colnum,
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the column expressions so that it's passed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.FuncExpr:
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
Expand All @@ -648,6 +654,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Opcode: IsNotNull,
ColNum: colnum,
})
// Add it to the column expressions so that it's passed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,16 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
st.Name, rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where ")
// First we add any predicates that should be pushed down.
for i, expr := range rs.plan.whereExprsToPushDown {
if i != 0 {
buf.Myprintf(" and ")
}
buf.Myprintf("%s", sqlparser.String(expr))
}
if len(rs.plan.whereExprsToPushDown) > 0 {
buf.Myprintf(" and ")
}
prefix := ""
// This loop handles the case for composite PKs. For example,
// if lastpk was (1,2), the where clause would be:
Expand Down

0 comments on commit 94f6471

Please sign in to comment.