Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable REPLACE INTO engine and Fix Foreign key locking issue #14532

Merged
merged 10 commits into from
Nov 20, 2023
1 change: 1 addition & 0 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ const (
ERJSONValueTooBig = ErrorCode(3150)
ERJSONDocumentTooDeep = ErrorCode(3157)

ERLockNowait = ErrorCode(3572)
ERRegexpStringNotTerminated = ErrorCode(3684)
ERRegexpBufferOverflow = ErrorCode(3684)
ERRegexpIllegalArgument = ErrorCode(3685)
Expand Down
55 changes: 25 additions & 30 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,51 +256,46 @@ func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*int

// WaitForKsError waits for the ks error field to be populated and returns it.
func WaitForKsError(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string) string {
var errString string
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
ksErr, fieldPresent := keyspace["error"]
if !fieldPresent {
return false
}
var ok bool
errString, ok = ksErr.(string)
return ok
})
return errString
}

// WaitForVschemaCondition waits for the condition to be true
func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string, conditionMet func(t *testing.T, keyspace map[string]interface{}) bool) {
timeout := time.After(60 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("schema tracking did not find error in '%s'", ks)
return ""
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s", ks)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
kss := convertToMap(*res)["keyspaces"]
ksMap := convertToMap(convertToMap(kss)[ks])
ksErr, fieldPresent := ksMap["error"]
if !fieldPresent {
time.Sleep(100 * time.Millisecond)
continue
if conditionMet(t, ksMap) {
return
}
errString, isErr := ksErr.(string)
if !isErr {
time.Sleep(100 * time.Millisecond)
continue
}
return errString
time.Sleep(100 * time.Millisecond)
}
}
}

// WaitForTableDeletions waits for a table to be deleted
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("schema tracking still found the table '%s'", tbl)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
keyspacesMap := convertToMap(*res)["keyspaces"]
ksMap := convertToMap(keyspacesMap)[ks]
tablesMap := convertToMap(ksMap)["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
if !isPresent {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
tablesMap := keyspace["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
return !isPresent
})
}

// WaitForColumn waits for a table's column to be present
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (fz *fuzzer) generateQuery() []string {
}

func getInsertType() string {
return "insert"
return []string{"insert", "replace"}[rand.Intn(2)]
}

// generateInsertDMLQuery generates an INSERT query from the parameters for the fuzzer.
Expand Down
36 changes: 29 additions & 7 deletions go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,20 +1000,26 @@ func TestCyclicFks(t *testing.T) {

// Create a cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 add constraint test_cyclic_fks foreign key (col) references fk_t12 (col) on delete cascade on update cascade")
defer func() {
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")
}()
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved

// Wait for schema-tracking to be complete.
ksErr := utils.WaitForKsError(t, clusterInstance.VtgateProcess, unshardedKs)
// Make sure Vschema has the error for cyclic foreign keys.
assert.Contains(t, ksErr, "VT09019: uks has cyclic foreign keys")
errString := utils.WaitForKsError(t, clusterInstance.VtgateProcess, unshardedKs)
assert.Contains(t, errString, "VT09019: keyspace 'uks' has cyclic foreign keys")

// Ensure that the Vitess database is originally empty
ensureDatabaseState(t, mcmp.VtConn, true)

_, err := utils.ExecAllowError(t, mcmp.VtConn, "insert into fk_t10(id, col) values (1, 1)")
require.ErrorContains(t, err, "VT09019: uks has cyclic foreign keys")
require.ErrorContains(t, err, "VT09019: keyspace 'uks' has cyclic foreign keys")

// Drop the cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")

// Wait for schema-tracking to be complete.
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]interface{}) bool {
_, fieldPresent := keyspace["error"]
return !fieldPresent
})

}

func TestReplace(t *testing.T) {
Expand Down Expand Up @@ -1175,4 +1181,20 @@ func TestReplaceWithFK(t *testing.T) {
// replace some data.
_, err := utils.ExecAllowError(t, conn, `replace into t1(id, col) values (1, 1)`)
require.ErrorContains(t, err, "VT12001: unsupported: REPLACE INTO with sharded keyspace (errno 1235) (sqlstate 42000)")

_ = utils.Exec(t, conn, `use uks`)

_ = utils.Exec(t, conn, `replace into u_t1(id, col1) values (1, 1), (2, 1)`)
// u_t1: (1,1) (2,1)

_ = utils.Exec(t, conn, `replace into u_t2(id, col2) values (1, 1), (2, 1)`)
// u_t1: (1,1) (2,1)
// u_t2: (1,1) (2,1)

_ = utils.Exec(t, conn, `replace into u_t1(id, col1) values (2, 2)`)
// u_t1: (1,1) (2,2)
// u_t2: (1,null) (2,null)

utils.AssertMatches(t, conn, `select * from u_t1`, `[[INT64(1) INT64(1)] [INT64(2) INT64(2)]]`)
utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(1) NULL] [INT64(2) NULL]]`)
}
5 changes: 3 additions & 2 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,7 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
for _, tableName := range tableNames {
err := utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
require.NoError(t, err)
utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
}
})
t.Run("creating tables", func(t *testing.T) {
Expand Down Expand Up @@ -922,6 +921,8 @@ func isFKError(err error) bool {
return false
case sqlerror.ERLockDeadlock:
return false // bummer, but deadlocks can happen, it's a legit error.
case sqlerror.ERLockNowait:
return false // For some queries we use NOWAIT. Bummer, but this can happen, it's a legit error.
case sqlerror.ERNoReferencedRow,
sqlerror.ERRowIsReferenced,
sqlerror.ERRowIsReferenced2,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (
VT09016 = errorWithState("VT09016", vtrpcpb.Code_FAILED_PRECONDITION, RowIsReferenced2, "Cannot delete or update a parent row: a foreign key constraint fails", "SET DEFAULT is not supported by InnoDB")
VT09017 = errorWithoutState("VT09017", vtrpcpb.Code_FAILED_PRECONDITION, "%s", "Invalid syntax for the statement type.")
VT09018 = errorWithoutState("VT09018", vtrpcpb.Code_FAILED_PRECONDITION, "%s", "Invalid syntax for the vindex function statement.")
VT09019 = errorWithoutState("VT09019", vtrpcpb.Code_FAILED_PRECONDITION, "%s has cyclic foreign keys", "Vitess doesn't support cyclic foreign keys.")
VT09019 = errorWithoutState("VT09019", vtrpcpb.Code_FAILED_PRECONDITION, "keyspace '%s' has cyclic foreign keys", "Vitess doesn't support cyclic foreign keys.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "'replace into' with foreign key constraints are not allowed", "Foreign key constraints sometimes are not written in binary logs and will cause issue with vreplication workflows like online-ddl.")
Expand Down
22 changes: 20 additions & 2 deletions go/vt/vtgate/engine/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,30 @@ func (s *Sequential) GetTableName() string {

// TryExecute performs a non-streaming exec.
func (s *Sequential) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantFields bool) (*sqltypes.Result, error) {
return nil, vterrors.VT10002()
finalRes := &sqltypes.Result{}
for _, source := range s.Sources {
res, err := vcursor.ExecutePrimitive(ctx, source, bindVars, wantFields)
if err != nil {
return nil, err
}
finalRes.RowsAffected += res.RowsAffected
if finalRes.InsertID == 0 {
finalRes.InsertID = res.InsertID
}
if res.Info != "" {
finalRes.Info = res.Info
}
}
return finalRes, nil
}

// TryStreamExecute performs a streaming exec.
func (s *Sequential) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantFields bool, callback func(*sqltypes.Result) error) error {
return vterrors.VT10002()
qr, err := s.TryExecute(ctx, vcursor, bindVars, wantFields)
if err != nil {
return err
}
return callback(qr)
}

// GetFields fetches the field info.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func createFkCascadeOpForDelete(ctx *plancontext.PlanningContext, parentOp ops.O
}
fkChildren = append(fkChildren, fkChild)
}
selectionOp, err := createSelectionOp(ctx, selectExprs, delStmt.TableExprs, delStmt.Where, nil, nil, sqlparser.ForUpdateLock)
selectionOp, err := createSelectionOp(ctx, selectExprs, delStmt.TableExprs, delStmt.Where, nil, nil, sqlparser.ForUpdateLockNoWait)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func exposeColumnsThroughDerivedTable(ctx *plancontext.PlanningContext, p *Proje
}

expr = semantics.RewriteDerivedTableExpression(expr, derivedTbl)
out := prefixColNames(tblName, expr)
out := prefixColNames(ctx, tblName, expr)

alias := sqlparser.UnescapedString(out)
predicate.LHSExprs[idx].Expr = sqlparser.NewColNameWithQualifier(alias, derivedTblName)
Expand All @@ -450,14 +450,14 @@ func exposeColumnsThroughDerivedTable(ctx *plancontext.PlanningContext, p *Proje

// prefixColNames adds qualifier prefixes to all ColName:s.
// We want to be more explicit than the user was to make sure we never produce invalid SQL
func prefixColNames(tblName sqlparser.TableName, e sqlparser.Expr) sqlparser.Expr {
func prefixColNames(ctx *plancontext.PlanningContext, tblName sqlparser.TableName, e sqlparser.Expr) sqlparser.Expr {
return sqlparser.CopyOnRewrite(e, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
col, ok := cursor.Node().(*sqlparser.ColName)
if !ok {
return
}
col.Qualifier = tblName
}, nil).(sqlparser.Expr)
cursor.Replace(sqlparser.NewColNameWithQualifier(col.Name.String(), tblName))
}, ctx.SemTable.CopySemanticInfo).(sqlparser.Expr)
}

func createProjectionWithTheseColumns(
Expand Down
32 changes: 19 additions & 13 deletions go/vt/vtgate/planbuilder/operators/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func createFKCascadeOp(ctx *plancontext.PlanningContext, parentOp ops.Operator,
fkChildren = append(fkChildren, fkChild)
}

selectionOp, err := createSelectionOp(ctx, selectExprs, updStmt.TableExprs, updStmt.Where, updStmt.OrderBy, nil, sqlparser.ForUpdateLock)
selectionOp, err := createSelectionOp(ctx, selectExprs, updStmt.TableExprs, updStmt.Where, updStmt.OrderBy, nil, sqlparser.ForUpdateLockNoWait)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -482,7 +482,9 @@ func buildChildUpdOpForSetNull(
// For example, if we are setting `update parent cola = :v1 and colb = :v2`, then on the child, the where condition would look something like this -
// `:v1 IS NULL OR :v2 IS NULL OR (child_cola, child_colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL).
compExpr := nullSafeNotInComparison(ctx.SemTable.GetUpdateExpressionsForFk(fk.String(updatedTable)), fk, updatedTable.GetTableName(), nonLiteralUpdateInfo)
updateExprs := ctx.SemTable.GetUpdateExpressionsForFk(fk.String(updatedTable))
compExpr := nullSafeNotInComparison(ctx,
updateExprs, fk, updatedTable.GetTableName(), nonLiteralUpdateInfo, false /* appendQualifier */)
if compExpr != nil {
childWhereExpr = &sqlparser.AndExpr{
Left: childWhereExpr,
Expand Down Expand Up @@ -605,8 +607,8 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
Right: sqlparser.NewColNameWithQualifier(pFK.ChildColumns[idx].String(), childTbl),
}
} else {
notEqualColNames = append(notEqualColNames, prefixColNames(childTbl, matchedExpr.Name))
prefixedMatchExpr := prefixColNames(childTbl, matchedExpr.Expr)
notEqualColNames = append(notEqualColNames, prefixColNames(ctx, childTbl, matchedExpr.Name))
prefixedMatchExpr := prefixColNames(ctx, childTbl, matchedExpr.Expr)
notEqualExprs = append(notEqualExprs, prefixedMatchExpr)
joinExpr = &sqlparser.ComparisonExpr{
Operator: sqlparser.EqualOp,
Expand Down Expand Up @@ -641,7 +643,7 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
}
// add existing where condition on the update statement
if updStmt.Where != nil {
whereCond = &sqlparser.AndExpr{Left: whereCond, Right: prefixColNames(childTbl, updStmt.Where.Expr)}
whereCond = &sqlparser.AndExpr{Left: whereCond, Right: prefixColNames(ctx, childTbl, updStmt.Where.Expr)}
}
return createSelectionOp(ctx,
sqlparser.SelectExprs{sqlparser.NewAliasedExpr(sqlparser.NewIntLiteral("1"), "")},
Expand All @@ -655,7 +657,7 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
sqlparser.NewWhere(sqlparser.WhereClause, whereCond),
nil,
sqlparser.NewLimitWithoutOffset(1),
sqlparser.ShareModeLock)
sqlparser.ForShareLockNoWait)
}

// Each child foreign key constraint is verified by a join query of the form:
Expand Down Expand Up @@ -696,7 +698,7 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
var whereCond sqlparser.Expr
// add existing where condition on the update statement
if updStmt.Where != nil {
whereCond = prefixColNames(parentTbl, updStmt.Where.Expr)
whereCond = prefixColNames(ctx, parentTbl, updStmt.Where.Expr)
}

// We don't want to fail the RESTRICT for the case where the parent columns remains unchanged on the update.
Expand All @@ -708,7 +710,7 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
// For example, if we are setting `update child cola = :v1 and colb = :v2`, then on the parent, the where condition would look something like this -
// `:v1 IS NULL OR :v2 IS NULL OR (cola, colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL).
compExpr := nullSafeNotInComparison(updStmt.Exprs, cFk, parentTbl, nil)
compExpr := nullSafeNotInComparison(ctx, updStmt.Exprs, cFk, parentTbl, nil /* nonLiteralUpdateInfo */, true /* appendQualifier */)
if compExpr != nil {
whereCond = sqlparser.AndExpressions(whereCond, compExpr)
}
Expand All @@ -725,15 +727,15 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
sqlparser.NewWhere(sqlparser.WhereClause, whereCond),
nil,
sqlparser.NewLimitWithoutOffset(1),
sqlparser.ShareModeLock)
sqlparser.ForShareLockNoWait)
}

// nullSafeNotInComparison is used to compare the child columns in the foreign key constraint aren't the same as the updateExpressions exactly.
// This comparison has to be null safe so we create an expression which looks like the following for a query like `update child cola = :v1 and colb = :v2` -
// This comparison has to be null safe, so we create an expression which looks like the following for a query like `update child cola = :v1 and colb = :v2` -
// `:v1 IS NULL OR :v2 IS NULL OR (cola, colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL)
// This expression is used in cascading SET NULLs and in verifying whether an update should be restricted.
func nullSafeNotInComparison(updateExprs sqlparser.UpdateExprs, cFk vindexes.ChildFKInfo, parentTbl sqlparser.TableName, nonLiteralUpdateInfo []engine.NonLiteralUpdateInfo) sqlparser.Expr {
func nullSafeNotInComparison(ctx *plancontext.PlanningContext, updateExprs sqlparser.UpdateExprs, cFk vindexes.ChildFKInfo, parentTbl sqlparser.TableName, nonLiteralUpdateInfo []engine.NonLiteralUpdateInfo, appendQualifier bool) sqlparser.Expr {
var valTuple sqlparser.ValTuple
var updateValues sqlparser.ValTuple
for idx, updateExpr := range updateExprs {
Expand All @@ -742,12 +744,16 @@ func nullSafeNotInComparison(updateExprs sqlparser.UpdateExprs, cFk vindexes.Chi
if sqlparser.IsNull(updateExpr.Expr) {
return nil
}
childUpdateExpr := prefixColNames(parentTbl, updateExpr.Expr)
childUpdateExpr := prefixColNames(ctx, parentTbl, updateExpr.Expr)
if len(nonLiteralUpdateInfo) > 0 && nonLiteralUpdateInfo[idx].UpdateExprBvName != "" {
childUpdateExpr = sqlparser.NewArgument(nonLiteralUpdateInfo[idx].UpdateExprBvName)
}
updateValues = append(updateValues, childUpdateExpr)
valTuple = append(valTuple, sqlparser.NewColNameWithQualifier(cFk.ChildColumns[colIdx].String(), cFk.Table.GetTableName()))
if appendQualifier {
valTuple = append(valTuple, sqlparser.NewColNameWithQualifier(cFk.ChildColumns[colIdx].String(), cFk.Table.GetTableName()))
} else {
valTuple = append(valTuple, sqlparser.NewColName(cFk.ChildColumns[colIdx].String()))
}
}
}

Expand Down
Loading
Loading