Skip to content

Commit

Permalink
Improve error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
1 parent 3964c7c commit 49c1105
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 15 deletions.
38 changes: 28 additions & 10 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
sw.cancelMigration(ctx, sm)
return 0, sw.logs(), nil
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return 0, sw.logs(), err
}

// We stop writes on the source before stopping the source streams so that the catchup time
Expand All @@ -2928,7 +2930,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2946,7 +2950,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr))
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2956,7 +2962,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr))
}
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
Expand All @@ -2977,7 +2985,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to sync up replication between the source and target", err)
}

Expand All @@ -2986,7 +2996,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to migrate the workflow streams", err)
}

Expand All @@ -2995,7 +3007,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to reset the sequences", err)
}

Expand All @@ -3004,7 +3018,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to create the reverse vreplication streams", err)
}

Expand All @@ -3019,7 +3035,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin
return sm.StopStreams(ctx)
}

func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
r.ts.cancelMigration(ctx, sm)
func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
return r.ts.cancelMigration(ctx, sm)
}

func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
return nil, nil
}

func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
dr.drLog.Log("Cancel migration as requested")
return nil
}

func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type iswitcher interface {
lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator)
cancelMigration(ctx context.Context, sm *StreamMigrator) error
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,9 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {

// cancelMigration attempts to revert all changes made during the migration so that we can get back to the
// state when traffic switching (or reversing) was initiated.
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
var err error
cancelErrs := &concurrency.AllErrorRecorder{}

if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
Expand All @@ -1162,6 +1163,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

Expand All @@ -1174,13 +1176,20 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
return err
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

err = ts.deleteReverseVReplication(cmCtx)
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}

if cancelErrs.HasErrors() {
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}
return nil
}

func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
Expand Down

0 comments on commit 49c1105

Please sign in to comment.