Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query building routines of DML event queries, reducing time and allocations #1459

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Applier struct {
migrationContext *base.MigrationContext
finishedMigrating int64
name string

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
return nil
}

func (this *Applier) prepareQueries() (err error) {
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
return nil
}

// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
func (this *Applier) validateAndReadGlobalVariables() error {
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
Expand Down Expand Up @@ -1135,35 +1170,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv

// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
}
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return append(results, newDmlBuildResult(query, args, 0, err))
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
}
}
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
Expand Down
7 changes: 7 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
columnValues := sql.ToColumnValues([]interface{}{123456, 42})

migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "test"
migrationContext.OriginalTableColumns = columns
migrationContext.SharedColumns = columns
Expand All @@ -110,6 +111,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
}

applier := NewApplier(migrationContext)
applier.prepareQueries()

t.Run("delete", func(t *testing.T) {
binlogEvent := &binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -290,8 +292,13 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "primary_key",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
Expand Down
4 changes: 4 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
}
// We can prepare some of the queries on the applier
if err := this.applier.prepareQueries(); err != nil {
return err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
Expand Down
170 changes: 170 additions & 0 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,173 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
)
return result, sharedArgs, uniqueKeyArgs, nil
}

type DMLDeleteQueryBuilder struct {
tableColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

func NewDMLDeleteQueryBuilder(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList) (*DMLDeleteQueryBuilder, error) {
if uniqueKeyColumns.Len() == 0 {
return nil, fmt.Errorf("no unique key columns found in NewDMLDeleteQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return nil, err
}

stmt := fmt.Sprintf(`
delete /* gh-ost %s.%s */
from
%s.%s
where
%s`,
databaseName, tableName,
databaseName, tableName,
equalsComparison,
)

b := &DMLDeleteQueryBuilder{
tableColumns: tableColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}
return b, nil
}

func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
}
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
return b.preparedStatement, uniqueKeyArgs, nil
}

type DMLInsertQueryBuilder struct {
tableColumns, sharedColumns *ColumnList
preparedStatement string
}

func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
}
if sharedColumns.Len() == 0 {
return nil, fmt.Errorf("no shared columns found in NewDMLInsertQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
for i := range mappedSharedColumnNames {
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
}
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)

stmt := fmt.Sprintf(`
replace /* gh-ost %s.%s */
into
%s.%s
(%s)
values
(%s)`,
databaseName, tableName,
databaseName, tableName,
strings.Join(mappedSharedColumnNames, ", "),
strings.Join(preparedValues, ", "),
)

return &DMLInsertQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
preparedStatement: stmt,
}, nil
}

func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
}
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}
return b.preparedStatement, sharedArgs, nil
}

type DMLUpdateQueryBuilder struct {
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList) (*DMLUpdateQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLUpdateQueryBuilder")
}
if sharedColumns.Len() == 0 {
return nil, fmt.Errorf("no shared columns found in NewDMLUpdateQueryBuilder")
}
if uniqueKeyColumns.Len() == 0 {
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
setClause, err := BuildSetPreparedClause(mappedSharedColumns)
if err != nil {
return nil, err
}

equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return nil, err
}
stmt := fmt.Sprintf(`
update /* gh-ost %s.%s */
%s.%s
set
%s
where
%s`,
databaseName, tableName,
databaseName, tableName,
setClause,
equalsComparison,
)
return &DMLUpdateQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}, nil
}

func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
// TODO: move this check back to `NewDMLUpdateQueryBuilder()`, needs fix on generated columns.
if !b.uniqueKeyColumns.IsSubsetOf(b.sharedColumns) {
return "", nil, nil, fmt.Errorf("unique key columns is not a subset of shared columns in DMLUpdateQueryBuilder")
}

sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
}