Skip to content

Commit

Permalink
Fix named value building.
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurschreiber committed Oct 24, 2024
1 parent 2a3318c commit 1bd2b0b
Showing 1 changed file with 32 additions and 10 deletions.
42 changes: 32 additions & 10 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,46 +1234,68 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
rowDeltas := make([]int64, 0, len(dmlEvents))
multiArgs := []driver.NamedValue{}
var multiQueryBuilder strings.Builder

buildResults := make([]*dmlBuildResult, 0, len(dmlEvents))
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
for _, arg := range buildResult.args {
multiArgs = append(multiArgs, driver.NamedValue{Value: driver.Value(arg)})
}
rowDeltas = append(rowDeltas, buildResult.rowsDelta)
multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")

buildResults = append(buildResults, buildResult)
}
}

//this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
execErr := conn.Raw(func(driverConn any) error {
ex, ok := driverConn.(driver.ExecerContext)
if !ok {
return fmt.Errorf("could not cast driverConn to ExecerContext")
}

nvc, ok := driverConn.(driver.NamedValueChecker)
if !ok {
return fmt.Errorf("could not cast driverConn to NamedValueChecker")
}

var multiArgs []driver.NamedValue
multiQueryBuilder := strings.Builder{}
var rowDeltas []int64

for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")

rowDeltas = append(rowDeltas, buildResult.rowsDelta)
}

// this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
this.migrationContext.Log.Errorf("Error exec: %+v", err)
return err
}

mysqlRes, ok := res.(drivermysql.Result)
if !ok {
return fmt.Errorf("Could not cast %+v to mysql.Result", res)
}

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += rowDeltas[i] * rowsAffected
}

return nil
})

if execErr != nil {
return rollback(execErr)
}
Expand Down

0 comments on commit 1bd2b0b

Please sign in to comment.